# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import contextlib
import json
import logging
from typing import Any, AsyncIterator, Awaitable, Iterator, Optional, Union
from urllib.parse import urlencode

from . import _api_module
from . import _base_transformers as base_t
from . import _common
from . import _extra_utils
from . import _mcp_utils
from . import _transformers as t
from . import errors
from . import types
from ._api_client import BaseApiClient
from ._common import get_value_by_path as getv
from ._common import set_value_by_path as setv
from .pagers import AsyncPager, Pager

logger = logging.getLogger('google_genai.models')


def _PersonGeneration_to_mldev_enum_validate(enum_value: Any) -> None:
  if enum_value in set(['ALLOW_ALL']):
    raise ValueError(
        f'{enum_value} enum value is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )


def _SafetyFilterLevel_to_mldev_enum_validate(enum_value: Any) -> None:
  if enum_value in set(['BLOCK_NONE']):
    raise ValueError(
        f'{enum_value} enum value is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )


def _VideoGenerationReferenceType_to_mldev_enum_validate(
    enum_value: Any,
) -> None:
  if enum_value in set(['STYLE']):
    raise ValueError(
        f'{enum_value} enum value is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )


def _AuthConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['api_key']) is not None:
    setv(to_object, ['apiKey'], getv(from_object, ['api_key']))

  if getv(from_object, ['api_key_config']) is not None:
    raise ValueError(
        'api_key_config parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['auth_type']) is not None:
    raise ValueError(
        'auth_type parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['google_service_account_config']) is not None:
    raise ValueError(
        'google_service_account_config parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['http_basic_auth_config']) is not None:
    raise ValueError(
        'http_basic_auth_config parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['oauth_config']) is not None:
    raise ValueError(
        'oauth_config parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['oidc_config']) is not None:
    raise ValueError(
        'oidc_config parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _Blob_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['data']) is not None:
    setv(to_object, ['data'], getv(from_object, ['data']))

  if getv(from_object, ['display_name']) is not None:
    raise ValueError(
        'display_name parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _Candidate_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['content']) is not None:
    setv(to_object, ['content'], getv(from_object, ['content']))

  if getv(from_object, ['citationMetadata']) is not None:
    setv(
        to_object,
        ['citation_metadata'],
        _CitationMetadata_from_mldev(
            getv(from_object, ['citationMetadata']), to_object, root_object
        ),
    )

  if getv(from_object, ['tokenCount']) is not None:
    setv(to_object, ['token_count'], getv(from_object, ['tokenCount']))

  if getv(from_object, ['finishReason']) is not None:
    setv(to_object, ['finish_reason'], getv(from_object, ['finishReason']))

  if getv(from_object, ['groundingMetadata']) is not None:
    setv(
        to_object,
        ['grounding_metadata'],
        getv(from_object, ['groundingMetadata']),
    )

  if getv(from_object, ['avgLogprobs']) is not None:
    setv(to_object, ['avg_logprobs'], getv(from_object, ['avgLogprobs']))

  if getv(from_object, ['index']) is not None:
    setv(to_object, ['index'], getv(from_object, ['index']))

  if getv(from_object, ['logprobsResult']) is not None:
    setv(to_object, ['logprobs_result'], getv(from_object, ['logprobsResult']))

  if getv(from_object, ['safetyRatings']) is not None:
    setv(
        to_object,
        ['safety_ratings'],
        [item for item in getv(from_object, ['safetyRatings'])],
    )

  if getv(from_object, ['urlContextMetadata']) is not None:
    setv(
        to_object,
        ['url_context_metadata'],
        getv(from_object, ['urlContextMetadata']),
    )

  return to_object


def _CitationMetadata_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['citationSources']) is not None:
    setv(
        to_object,
        ['citations'],
        [item for item in getv(from_object, ['citationSources'])],
    )

  return to_object


def _CodeExecutionResult_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['outcome']) is not None:
    setv(to_object, ['outcome'], getv(from_object, ['outcome']))

  if getv(from_object, ['output']) is not None:
    setv(to_object, ['output'], getv(from_object, ['output']))

  if getv(from_object, ['id']) is not None:
    raise ValueError(
        'id parameter is only supported in Gemini Developer API mode, not in'
        ' Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _ComputeTokensParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['contents'],
        [
            _Content_to_vertex(item, to_object, root_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  return to_object


def _ComputeTokensResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['tokensInfo']) is not None:
    setv(
        to_object,
        ['tokens_info'],
        [item for item in getv(from_object, ['tokensInfo'])],
    )

  return to_object


def _ContentEmbeddingStatistics_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['truncated']) is not None:
    setv(to_object, ['truncated'], getv(from_object, ['truncated']))

  if getv(from_object, ['token_count']) is not None:
    setv(to_object, ['token_count'], getv(from_object, ['token_count']))

  return to_object


def _ContentEmbedding_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['values']) is not None:
    setv(to_object, ['values'], getv(from_object, ['values']))

  if getv(from_object, ['statistics']) is not None:
    setv(
        to_object,
        ['statistics'],
        _ContentEmbeddingStatistics_from_vertex(
            getv(from_object, ['statistics']), to_object, root_object
        ),
    )

  return to_object


def _Content_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['parts']) is not None:
    setv(
        to_object,
        ['parts'],
        [
            _Part_to_mldev(item, to_object, root_object)
            for item in getv(from_object, ['parts'])
        ],
    )

  if getv(from_object, ['role']) is not None:
    setv(to_object, ['role'], getv(from_object, ['role']))

  return to_object


def _Content_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['parts']) is not None:
    setv(
        to_object,
        ['parts'],
        [
            _Part_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['parts'])
        ],
    )

  if getv(from_object, ['role']) is not None:
    setv(to_object, ['role'], getv(from_object, ['role']))

  return to_object


def _ControlReferenceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['control_type']) is not None:
    setv(to_object, ['controlType'], getv(from_object, ['control_type']))

  if getv(from_object, ['enable_control_image_computation']) is not None:
    setv(
        to_object,
        ['computeControl'],
        getv(from_object, ['enable_control_image_computation']),
    )

  return to_object


def _CountTokensConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['system_instruction']) is not None:
    raise ValueError(
        'system_instruction parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['tools']) is not None:
    raise ValueError(
        'tools parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['generation_config']) is not None:
    raise ValueError(
        'generation_config parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _CountTokensConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['system_instruction']) is not None:
    setv(
        parent_object,
        ['systemInstruction'],
        _Content_to_vertex(
            t.t_content(getv(from_object, ['system_instruction'])),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['tools']) is not None:
    setv(
        parent_object,
        ['tools'],
        [
            _Tool_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['tools'])
        ],
    )

  if getv(from_object, ['generation_config']) is not None:
    setv(
        parent_object,
        ['generationConfig'],
        _GenerationConfig_to_vertex(
            getv(from_object, ['generation_config']), to_object, root_object
        ),
    )

  return to_object


def _CountTokensParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['contents'],
        [
            _Content_to_mldev(item, to_object, root_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  if getv(from_object, ['config']) is not None:
    _CountTokensConfig_to_mldev(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _CountTokensParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['contents'],
        [
            _Content_to_vertex(item, to_object, root_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  if getv(from_object, ['config']) is not None:
    _CountTokensConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _CountTokensResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['totalTokens']) is not None:
    setv(to_object, ['total_tokens'], getv(from_object, ['totalTokens']))

  if getv(from_object, ['cachedContentTokenCount']) is not None:
    setv(
        to_object,
        ['cached_content_token_count'],
        getv(from_object, ['cachedContentTokenCount']),
    )

  return to_object


def _CountTokensResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['totalTokens']) is not None:
    setv(to_object, ['total_tokens'], getv(from_object, ['totalTokens']))

  return to_object


def _DeleteModelParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  return to_object


def _DeleteModelParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  return to_object


def _DeleteModelResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  return to_object


def _DeleteModelResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  return to_object


def _EditImageConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['output_gcs_uri']) is not None:
    setv(
        parent_object,
        ['parameters', 'storageUri'],
        getv(from_object, ['output_gcs_uri']),
    )

  if getv(from_object, ['negative_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'negativePrompt'],
        getv(from_object, ['negative_prompt']),
    )

  if getv(from_object, ['number_of_images']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_images']),
    )

  if getv(from_object, ['aspect_ratio']) is not None:
    setv(
        parent_object,
        ['parameters', 'aspectRatio'],
        getv(from_object, ['aspect_ratio']),
    )

  if getv(from_object, ['guidance_scale']) is not None:
    setv(
        parent_object,
        ['parameters', 'guidanceScale'],
        getv(from_object, ['guidance_scale']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(parent_object, ['parameters', 'seed'], getv(from_object, ['seed']))

  if getv(from_object, ['safety_filter_level']) is not None:
    setv(
        parent_object,
        ['parameters', 'safetySetting'],
        getv(from_object, ['safety_filter_level']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['include_safety_attributes']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeSafetyAttributes'],
        getv(from_object, ['include_safety_attributes']),
    )

  if getv(from_object, ['include_rai_reason']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeRaiReason'],
        getv(from_object, ['include_rai_reason']),
    )

  if getv(from_object, ['language']) is not None:
    setv(
        parent_object,
        ['parameters', 'language'],
        getv(from_object, ['language']),
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['add_watermark']) is not None:
    setv(
        parent_object,
        ['parameters', 'addWatermark'],
        getv(from_object, ['add_watermark']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['edit_mode']) is not None:
    setv(
        parent_object,
        ['parameters', 'editMode'],
        getv(from_object, ['edit_mode']),
    )

  if getv(from_object, ['base_steps']) is not None:
    setv(
        parent_object,
        ['parameters', 'editConfig', 'baseSteps'],
        getv(from_object, ['base_steps']),
    )

  return to_object


def _EditImageParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt']))

  if getv(from_object, ['reference_images']) is not None:
    setv(
        to_object,
        ['instances[0]', 'referenceImages'],
        [
            _ReferenceImageAPI_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['reference_images'])
        ],
    )

  if getv(from_object, ['config']) is not None:
    _EditImageConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _EditImageResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_images'],
        [
            _GeneratedImage_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  return to_object


def _EmbedContentConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['task_type']) is not None:
    setv(
        parent_object,
        ['requests[]', 'taskType'],
        getv(from_object, ['task_type']),
    )

  if getv(from_object, ['title']) is not None:
    setv(parent_object, ['requests[]', 'title'], getv(from_object, ['title']))

  if getv(from_object, ['output_dimensionality']) is not None:
    setv(
        parent_object,
        ['requests[]', 'outputDimensionality'],
        getv(from_object, ['output_dimensionality']),
    )

  if getv(from_object, ['mime_type']) is not None:
    raise ValueError(
        'mime_type parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['auto_truncate']) is not None:
    raise ValueError(
        'auto_truncate parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['document_ocr']) is not None:
    raise ValueError(
        'document_ocr parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['audio_track_extraction']) is not None:
    raise ValueError(
        'audio_track_extraction parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _EmbedContentConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['task_type']) is not None:
      setv(
          parent_object,
          ['instances[]', 'task_type'],
          getv(from_object, ['task_type']),
      )
  elif discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['task_type']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'taskType'],
          getv(from_object, ['task_type']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['title']) is not None:
      setv(
          parent_object, ['instances[]', 'title'], getv(from_object, ['title'])
      )
  elif discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['title']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'title'],
          getv(from_object, ['title']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['output_dimensionality']) is not None:
      setv(
          parent_object,
          ['parameters', 'outputDimensionality'],
          getv(from_object, ['output_dimensionality']),
      )
  elif discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['output_dimensionality']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'outputDimensionality'],
          getv(from_object, ['output_dimensionality']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['mime_type']) is not None:
      setv(
          parent_object,
          ['instances[]', 'mimeType'],
          getv(from_object, ['mime_type']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['auto_truncate']) is not None:
      setv(
          parent_object,
          ['parameters', 'autoTruncate'],
          getv(from_object, ['auto_truncate']),
      )
  elif discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['auto_truncate']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'autoTruncate'],
          getv(from_object, ['auto_truncate']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['document_ocr']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'documentOcr'],
          getv(from_object, ['document_ocr']),
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['audio_track_extraction']) is not None:
      setv(
          parent_object,
          ['embedContentConfig', 'audioTrackExtraction'],
          getv(from_object, ['audio_track_extraction']),
      )

  return to_object


def _EmbedContentParametersPrivate_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['requests[]', 'content'],
        [
            item
            for item in t.t_contents_for_embed(
                api_client, getv(from_object, ['contents'])
            )
        ],
    )

  if getv(from_object, ['content']) is not None:
    _Content_to_mldev(
        t.t_content(getv(from_object, ['content'])), to_object, root_object
    )

  if getv(from_object, ['config']) is not None:
    _EmbedContentConfig_to_mldev(
        getv(from_object, ['config']), to_object, root_object
    )

  setv(
      to_object,
      ['requests[]', 'model'],
      t.t_model(api_client, getv(from_object, ['model'])),
  )
  return to_object


def _EmbedContentParametersPrivate_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'PREDICT':
    if getv(from_object, ['contents']) is not None:
      setv(
          to_object,
          ['instances[]', 'content'],
          [
              item
              for item in t.t_contents_for_embed(
                  api_client, getv(from_object, ['contents'])
              )
          ],
      )

  discriminator = getv(root_object, ['embedding_api_type'])
  if discriminator is None:
    discriminator = 'PREDICT'
  if discriminator == 'EMBED_CONTENT':
    if getv(from_object, ['content']) is not None:
      setv(
          to_object,
          ['content'],
          _Content_to_vertex(
              t.t_content(getv(from_object, ['content'])),
              to_object,
              root_object,
          ),
      )

  if getv(from_object, ['config']) is not None:
    _EmbedContentConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _EmbedContentResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['embeddings']) is not None:
    setv(
        to_object,
        ['embeddings'],
        [item for item in getv(from_object, ['embeddings'])],
    )

  if getv(from_object, ['metadata']) is not None:
    setv(to_object, ['metadata'], getv(from_object, ['metadata']))

  return to_object


def _EmbedContentResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['predictions[]', 'embeddings']) is not None:
    setv(
        to_object,
        ['embeddings'],
        [
            _ContentEmbedding_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions[]', 'embeddings'])
        ],
    )

  if getv(from_object, ['metadata']) is not None:
    setv(to_object, ['metadata'], getv(from_object, ['metadata']))

  if getv(root_object, ['embedding_api_type']) == 'EMBED_CONTENT':
    embedding = getv(from_object, ['embedding'])
    usage_metadata = getv(from_object, ['usageMetadata'])
    truncated = getv(from_object, ['truncated'])
    if embedding:
      stats = {}
      if usage_metadata and usage_metadata.get('promptTokenCount'):
        stats['token_count'] = usage_metadata['promptTokenCount']
      if truncated:
        stats['truncated'] = truncated
      embedding['statistics'] = stats
      setv(to_object, ['embeddings'], [embedding])

  return to_object


def _Endpoint_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['endpoint']) is not None:
    setv(to_object, ['name'], getv(from_object, ['endpoint']))

  if getv(from_object, ['deployedModelId']) is not None:
    setv(
        to_object, ['deployed_model_id'], getv(from_object, ['deployedModelId'])
    )

  return to_object


def _ExecutableCode_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['code']) is not None:
    setv(to_object, ['code'], getv(from_object, ['code']))

  if getv(from_object, ['language']) is not None:
    setv(to_object, ['language'], getv(from_object, ['language']))

  if getv(from_object, ['id']) is not None:
    raise ValueError(
        'id parameter is only supported in Gemini Developer API mode, not in'
        ' Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _FileData_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['display_name']) is not None:
    raise ValueError(
        'display_name parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['file_uri']) is not None:
    setv(to_object, ['fileUri'], getv(from_object, ['file_uri']))

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _FunctionCall_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['id']) is not None:
    setv(to_object, ['id'], getv(from_object, ['id']))

  if getv(from_object, ['args']) is not None:
    setv(to_object, ['args'], getv(from_object, ['args']))

  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['partial_args']) is not None:
    raise ValueError(
        'partial_args parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['will_continue']) is not None:
    raise ValueError(
        'will_continue parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _FunctionCallingConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['allowed_function_names']) is not None:
    setv(
        to_object,
        ['allowedFunctionNames'],
        getv(from_object, ['allowed_function_names']),
    )

  if getv(from_object, ['mode']) is not None:
    setv(to_object, ['mode'], getv(from_object, ['mode']))

  if getv(from_object, ['stream_function_call_arguments']) is not None:
    raise ValueError(
        'stream_function_call_arguments parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _GenerateContentConfig_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['system_instruction']) is not None:
    setv(
        parent_object,
        ['systemInstruction'],
        _Content_to_mldev(
            t.t_content(getv(from_object, ['system_instruction'])),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['temperature']) is not None:
    setv(to_object, ['temperature'], getv(from_object, ['temperature']))

  if getv(from_object, ['top_p']) is not None:
    setv(to_object, ['topP'], getv(from_object, ['top_p']))

  if getv(from_object, ['top_k']) is not None:
    setv(to_object, ['topK'], getv(from_object, ['top_k']))

  if getv(from_object, ['candidate_count']) is not None:
    setv(to_object, ['candidateCount'], getv(from_object, ['candidate_count']))

  if getv(from_object, ['max_output_tokens']) is not None:
    setv(
        to_object, ['maxOutputTokens'], getv(from_object, ['max_output_tokens'])
    )

  if getv(from_object, ['stop_sequences']) is not None:
    setv(to_object, ['stopSequences'], getv(from_object, ['stop_sequences']))

  if getv(from_object, ['response_logprobs']) is not None:
    setv(
        to_object,
        ['responseLogprobs'],
        getv(from_object, ['response_logprobs']),
    )

  if getv(from_object, ['logprobs']) is not None:
    setv(to_object, ['logprobs'], getv(from_object, ['logprobs']))

  if getv(from_object, ['presence_penalty']) is not None:
    setv(
        to_object, ['presencePenalty'], getv(from_object, ['presence_penalty'])
    )

  if getv(from_object, ['frequency_penalty']) is not None:
    setv(
        to_object,
        ['frequencyPenalty'],
        getv(from_object, ['frequency_penalty']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(to_object, ['seed'], getv(from_object, ['seed']))

  if getv(from_object, ['response_mime_type']) is not None:
    setv(
        to_object,
        ['responseMimeType'],
        getv(from_object, ['response_mime_type']),
    )

  if getv(from_object, ['response_schema']) is not None:
    setv(
        to_object,
        ['responseSchema'],
        t.t_schema(api_client, getv(from_object, ['response_schema'])),
    )

  if getv(from_object, ['response_json_schema']) is not None:
    setv(
        to_object,
        ['responseJsonSchema'],
        getv(from_object, ['response_json_schema']),
    )

  if getv(from_object, ['routing_config']) is not None:
    raise ValueError(
        'routing_config parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['model_selection_config']) is not None:
    raise ValueError(
        'model_selection_config parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['safety_settings']) is not None:
    setv(
        parent_object,
        ['safetySettings'],
        [
            _SafetySetting_to_mldev(item, to_object, root_object)
            for item in getv(from_object, ['safety_settings'])
        ],
    )

  if getv(from_object, ['tools']) is not None:
    setv(
        parent_object,
        ['tools'],
        [
            _Tool_to_mldev(t.t_tool(api_client, item), to_object, root_object)
            for item in t.t_tools(api_client, getv(from_object, ['tools']))
        ],
    )

  if getv(from_object, ['tool_config']) is not None:
    setv(
        parent_object,
        ['toolConfig'],
        _ToolConfig_to_mldev(
            getv(from_object, ['tool_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['labels']) is not None:
    raise ValueError(
        'labels parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['cached_content']) is not None:
    setv(
        parent_object,
        ['cachedContent'],
        t.t_cached_content_name(
            api_client, getv(from_object, ['cached_content'])
        ),
    )

  if getv(from_object, ['response_modalities']) is not None:
    setv(
        to_object,
        ['responseModalities'],
        getv(from_object, ['response_modalities']),
    )

  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['speech_config']) is not None:
    setv(
        to_object,
        ['speechConfig'],
        t.t_speech_config(getv(from_object, ['speech_config'])),
    )

  if getv(from_object, ['audio_timestamp']) is not None:
    raise ValueError(
        'audio_timestamp parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['thinking_config']) is not None:
    setv(to_object, ['thinkingConfig'], getv(from_object, ['thinking_config']))

  if getv(from_object, ['image_config']) is not None:
    setv(
        to_object,
        ['imageConfig'],
        _ImageConfig_to_mldev(
            getv(from_object, ['image_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['enable_enhanced_civic_answers']) is not None:
    setv(
        to_object,
        ['enableEnhancedCivicAnswers'],
        getv(from_object, ['enable_enhanced_civic_answers']),
    )

  if getv(from_object, ['model_armor_config']) is not None:
    raise ValueError(
        'model_armor_config parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['service_tier']) is not None:
    setv(parent_object, ['serviceTier'], getv(from_object, ['service_tier']))

  return to_object


def _GenerateContentConfig_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['system_instruction']) is not None:
    setv(
        parent_object,
        ['systemInstruction'],
        _Content_to_vertex(
            t.t_content(getv(from_object, ['system_instruction'])),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['temperature']) is not None:
    setv(to_object, ['temperature'], getv(from_object, ['temperature']))

  if getv(from_object, ['top_p']) is not None:
    setv(to_object, ['topP'], getv(from_object, ['top_p']))

  if getv(from_object, ['top_k']) is not None:
    setv(to_object, ['topK'], getv(from_object, ['top_k']))

  if getv(from_object, ['candidate_count']) is not None:
    setv(to_object, ['candidateCount'], getv(from_object, ['candidate_count']))

  if getv(from_object, ['max_output_tokens']) is not None:
    setv(
        to_object, ['maxOutputTokens'], getv(from_object, ['max_output_tokens'])
    )

  if getv(from_object, ['stop_sequences']) is not None:
    setv(to_object, ['stopSequences'], getv(from_object, ['stop_sequences']))

  if getv(from_object, ['response_logprobs']) is not None:
    setv(
        to_object,
        ['responseLogprobs'],
        getv(from_object, ['response_logprobs']),
    )

  if getv(from_object, ['logprobs']) is not None:
    setv(to_object, ['logprobs'], getv(from_object, ['logprobs']))

  if getv(from_object, ['presence_penalty']) is not None:
    setv(
        to_object, ['presencePenalty'], getv(from_object, ['presence_penalty'])
    )

  if getv(from_object, ['frequency_penalty']) is not None:
    setv(
        to_object,
        ['frequencyPenalty'],
        getv(from_object, ['frequency_penalty']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(to_object, ['seed'], getv(from_object, ['seed']))

  if getv(from_object, ['response_mime_type']) is not None:
    setv(
        to_object,
        ['responseMimeType'],
        getv(from_object, ['response_mime_type']),
    )

  if getv(from_object, ['response_schema']) is not None:
    setv(
        to_object,
        ['responseSchema'],
        t.t_schema(api_client, getv(from_object, ['response_schema'])),
    )

  if getv(from_object, ['response_json_schema']) is not None:
    setv(
        to_object,
        ['responseJsonSchema'],
        getv(from_object, ['response_json_schema']),
    )

  if getv(from_object, ['routing_config']) is not None:
    setv(to_object, ['routingConfig'], getv(from_object, ['routing_config']))

  if getv(from_object, ['model_selection_config']) is not None:
    setv(
        to_object,
        ['modelConfig'],
        getv(from_object, ['model_selection_config']),
    )

  if getv(from_object, ['safety_settings']) is not None:
    setv(
        parent_object,
        ['safetySettings'],
        [item for item in getv(from_object, ['safety_settings'])],
    )

  if getv(from_object, ['tools']) is not None:
    setv(
        parent_object,
        ['tools'],
        [
            _Tool_to_vertex(t.t_tool(api_client, item), to_object, root_object)
            for item in t.t_tools(api_client, getv(from_object, ['tools']))
        ],
    )

  if getv(from_object, ['tool_config']) is not None:
    setv(
        parent_object,
        ['toolConfig'],
        _ToolConfig_to_vertex(
            getv(from_object, ['tool_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['cached_content']) is not None:
    setv(
        parent_object,
        ['cachedContent'],
        t.t_cached_content_name(
            api_client, getv(from_object, ['cached_content'])
        ),
    )

  if getv(from_object, ['response_modalities']) is not None:
    setv(
        to_object,
        ['responseModalities'],
        getv(from_object, ['response_modalities']),
    )

  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['speech_config']) is not None:
    setv(
        to_object,
        ['speechConfig'],
        _SpeechConfig_to_vertex(
            t.t_speech_config(getv(from_object, ['speech_config'])),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['audio_timestamp']) is not None:
    setv(to_object, ['audioTimestamp'], getv(from_object, ['audio_timestamp']))

  if getv(from_object, ['thinking_config']) is not None:
    setv(to_object, ['thinkingConfig'], getv(from_object, ['thinking_config']))

  if getv(from_object, ['image_config']) is not None:
    setv(
        to_object,
        ['imageConfig'],
        _ImageConfig_to_vertex(
            getv(from_object, ['image_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['enable_enhanced_civic_answers']) is not None:
    raise ValueError(
        'enable_enhanced_civic_answers parameter is only supported in Gemini'
        ' Developer API mode, not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['model_armor_config']) is not None:
    setv(
        parent_object,
        ['modelArmorConfig'],
        getv(from_object, ['model_armor_config']),
    )

  if getv(from_object, ['service_tier']) is not None:
    setv(parent_object, ['serviceTier'], getv(from_object, ['service_tier']))

  return to_object


def _GenerateContentParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['contents'],
        [
            _Content_to_mldev(item, to_object, root_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  if getv(from_object, ['config']) is not None:
    setv(
        to_object,
        ['generationConfig'],
        _GenerateContentConfig_to_mldev(
            api_client, getv(from_object, ['config']), to_object, root_object
        ),
    )

  return to_object


def _GenerateContentParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['contents']) is not None:
    setv(
        to_object,
        ['contents'],
        [
            _Content_to_vertex(item, to_object, root_object)
            for item in t.t_contents(getv(from_object, ['contents']))
        ],
    )

  if getv(from_object, ['config']) is not None:
    setv(
        to_object,
        ['generationConfig'],
        _GenerateContentConfig_to_vertex(
            api_client, getv(from_object, ['config']), to_object, root_object
        ),
    )

  return to_object


def _GenerateContentResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['candidates']) is not None:
    setv(
        to_object,
        ['candidates'],
        [
            _Candidate_from_mldev(item, to_object, root_object)
            for item in getv(from_object, ['candidates'])
        ],
    )

  if getv(from_object, ['modelVersion']) is not None:
    setv(to_object, ['model_version'], getv(from_object, ['modelVersion']))

  if getv(from_object, ['promptFeedback']) is not None:
    setv(to_object, ['prompt_feedback'], getv(from_object, ['promptFeedback']))

  if getv(from_object, ['responseId']) is not None:
    setv(to_object, ['response_id'], getv(from_object, ['responseId']))

  if getv(from_object, ['usageMetadata']) is not None:
    setv(to_object, ['usage_metadata'], getv(from_object, ['usageMetadata']))

  if getv(from_object, ['modelStatus']) is not None:
    setv(to_object, ['model_status'], getv(from_object, ['modelStatus']))

  return to_object


def _GenerateContentResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['candidates']) is not None:
    setv(
        to_object,
        ['candidates'],
        [item for item in getv(from_object, ['candidates'])],
    )

  if getv(from_object, ['createTime']) is not None:
    setv(to_object, ['create_time'], getv(from_object, ['createTime']))

  if getv(from_object, ['modelVersion']) is not None:
    setv(to_object, ['model_version'], getv(from_object, ['modelVersion']))

  if getv(from_object, ['promptFeedback']) is not None:
    setv(to_object, ['prompt_feedback'], getv(from_object, ['promptFeedback']))

  if getv(from_object, ['responseId']) is not None:
    setv(to_object, ['response_id'], getv(from_object, ['responseId']))

  if getv(from_object, ['usageMetadata']) is not None:
    setv(to_object, ['usage_metadata'], getv(from_object, ['usageMetadata']))

  return to_object


def _GenerateImagesConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['output_gcs_uri']) is not None:
    raise ValueError(
        'output_gcs_uri parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['negative_prompt']) is not None:
    raise ValueError(
        'negative_prompt parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['number_of_images']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_images']),
    )

  if getv(from_object, ['aspect_ratio']) is not None:
    setv(
        parent_object,
        ['parameters', 'aspectRatio'],
        getv(from_object, ['aspect_ratio']),
    )

  if getv(from_object, ['guidance_scale']) is not None:
    setv(
        parent_object,
        ['parameters', 'guidanceScale'],
        getv(from_object, ['guidance_scale']),
    )

  if getv(from_object, ['seed']) is not None:
    raise ValueError(
        'seed parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['safety_filter_level']) is not None:
    _SafetyFilterLevel_to_mldev_enum_validate(
        getv(from_object, ['safety_filter_level'])
    )
    setv(
        parent_object,
        ['parameters', 'safetySetting'],
        getv(from_object, ['safety_filter_level']),
    )

  if getv(from_object, ['person_generation']) is not None:
    _PersonGeneration_to_mldev_enum_validate(
        getv(from_object, ['person_generation'])
    )
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['include_safety_attributes']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeSafetyAttributes'],
        getv(from_object, ['include_safety_attributes']),
    )

  if getv(from_object, ['include_rai_reason']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeRaiReason'],
        getv(from_object, ['include_rai_reason']),
    )

  if getv(from_object, ['language']) is not None:
    setv(
        parent_object,
        ['parameters', 'language'],
        getv(from_object, ['language']),
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['add_watermark']) is not None:
    raise ValueError(
        'add_watermark parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['labels']) is not None:
    raise ValueError(
        'labels parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['image_size']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleImageSize'],
        getv(from_object, ['image_size']),
    )

  if getv(from_object, ['enhance_prompt']) is not None:
    raise ValueError(
        'enhance_prompt parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _GenerateImagesConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['output_gcs_uri']) is not None:
    setv(
        parent_object,
        ['parameters', 'storageUri'],
        getv(from_object, ['output_gcs_uri']),
    )

  if getv(from_object, ['negative_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'negativePrompt'],
        getv(from_object, ['negative_prompt']),
    )

  if getv(from_object, ['number_of_images']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_images']),
    )

  if getv(from_object, ['aspect_ratio']) is not None:
    setv(
        parent_object,
        ['parameters', 'aspectRatio'],
        getv(from_object, ['aspect_ratio']),
    )

  if getv(from_object, ['guidance_scale']) is not None:
    setv(
        parent_object,
        ['parameters', 'guidanceScale'],
        getv(from_object, ['guidance_scale']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(parent_object, ['parameters', 'seed'], getv(from_object, ['seed']))

  if getv(from_object, ['safety_filter_level']) is not None:
    setv(
        parent_object,
        ['parameters', 'safetySetting'],
        getv(from_object, ['safety_filter_level']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['include_safety_attributes']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeSafetyAttributes'],
        getv(from_object, ['include_safety_attributes']),
    )

  if getv(from_object, ['include_rai_reason']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeRaiReason'],
        getv(from_object, ['include_rai_reason']),
    )

  if getv(from_object, ['language']) is not None:
    setv(
        parent_object,
        ['parameters', 'language'],
        getv(from_object, ['language']),
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['add_watermark']) is not None:
    setv(
        parent_object,
        ['parameters', 'addWatermark'],
        getv(from_object, ['add_watermark']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['image_size']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleImageSize'],
        getv(from_object, ['image_size']),
    )

  if getv(from_object, ['enhance_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'enhancePrompt'],
        getv(from_object, ['enhance_prompt']),
    )

  return to_object


def _GenerateImagesParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt']))

  if getv(from_object, ['config']) is not None:
    _GenerateImagesConfig_to_mldev(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _GenerateImagesParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt']))

  if getv(from_object, ['config']) is not None:
    _GenerateImagesConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _GenerateImagesResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_images'],
        [
            _GeneratedImage_from_mldev(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  if getv(from_object, ['positivePromptSafetyAttributes']) is not None:
    setv(
        to_object,
        ['positive_prompt_safety_attributes'],
        _SafetyAttributes_from_mldev(
            getv(from_object, ['positivePromptSafetyAttributes']),
            to_object,
            root_object,
        ),
    )

  return to_object


def _GenerateImagesResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_images'],
        [
            _GeneratedImage_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  if getv(from_object, ['positivePromptSafetyAttributes']) is not None:
    setv(
        to_object,
        ['positive_prompt_safety_attributes'],
        _SafetyAttributes_from_vertex(
            getv(from_object, ['positivePromptSafetyAttributes']),
            to_object,
            root_object,
        ),
    )

  return to_object


def _GenerateVideosConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['number_of_videos']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_videos']),
    )

  if getv(from_object, ['output_gcs_uri']) is not None:
    raise ValueError(
        'output_gcs_uri parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['fps']) is not None:
    raise ValueError(
        'fps parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['duration_seconds']) is not None:
    setv(
        parent_object,
        ['parameters', 'durationSeconds'],
        getv(from_object, ['duration_seconds']),
    )

  if getv(from_object, ['seed']) is not None:
    raise ValueError(
        'seed parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['aspect_ratio']) is not None:
    setv(
        parent_object,
        ['parameters', 'aspectRatio'],
        getv(from_object, ['aspect_ratio']),
    )

  if getv(from_object, ['resolution']) is not None:
    setv(
        parent_object,
        ['parameters', 'resolution'],
        getv(from_object, ['resolution']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['pubsub_topic']) is not None:
    raise ValueError(
        'pubsub_topic parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['negative_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'negativePrompt'],
        getv(from_object, ['negative_prompt']),
    )

  if getv(from_object, ['enhance_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'enhancePrompt'],
        getv(from_object, ['enhance_prompt']),
    )

  if getv(from_object, ['generate_audio']) is not None:
    raise ValueError(
        'generate_audio parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['last_frame']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'lastFrame'],
        _Image_to_mldev(
            getv(from_object, ['last_frame']), to_object, root_object
        ),
    )

  if getv(from_object, ['reference_images']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'referenceImages'],
        [
            _VideoGenerationReferenceImage_to_mldev(
                item, to_object, root_object
            )
            for item in getv(from_object, ['reference_images'])
        ],
    )

  if getv(from_object, ['mask']) is not None:
    raise ValueError(
        'mask parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['compression_quality']) is not None:
    raise ValueError(
        'compression_quality parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['labels']) is not None:
    raise ValueError(
        'labels parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['webhook_config']) is not None:
    setv(
        parent_object, ['webhookConfig'], getv(from_object, ['webhook_config'])
    )

  if getv(from_object, ['resize_mode']) is not None:
    raise ValueError(
        'resize_mode parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _GenerateVideosConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['number_of_videos']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_videos']),
    )

  if getv(from_object, ['output_gcs_uri']) is not None:
    setv(
        parent_object,
        ['parameters', 'storageUri'],
        getv(from_object, ['output_gcs_uri']),
    )

  if getv(from_object, ['fps']) is not None:
    setv(parent_object, ['parameters', 'fps'], getv(from_object, ['fps']))

  if getv(from_object, ['duration_seconds']) is not None:
    setv(
        parent_object,
        ['parameters', 'durationSeconds'],
        getv(from_object, ['duration_seconds']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(parent_object, ['parameters', 'seed'], getv(from_object, ['seed']))

  if getv(from_object, ['aspect_ratio']) is not None:
    setv(
        parent_object,
        ['parameters', 'aspectRatio'],
        getv(from_object, ['aspect_ratio']),
    )

  if getv(from_object, ['resolution']) is not None:
    setv(
        parent_object,
        ['parameters', 'resolution'],
        getv(from_object, ['resolution']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['pubsub_topic']) is not None:
    setv(
        parent_object,
        ['parameters', 'pubsubTopic'],
        getv(from_object, ['pubsub_topic']),
    )

  if getv(from_object, ['negative_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'negativePrompt'],
        getv(from_object, ['negative_prompt']),
    )

  if getv(from_object, ['enhance_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'enhancePrompt'],
        getv(from_object, ['enhance_prompt']),
    )

  if getv(from_object, ['generate_audio']) is not None:
    setv(
        parent_object,
        ['parameters', 'generateAudio'],
        getv(from_object, ['generate_audio']),
    )

  if getv(from_object, ['last_frame']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'lastFrame'],
        _Image_to_vertex(
            getv(from_object, ['last_frame']), to_object, root_object
        ),
    )

  if getv(from_object, ['reference_images']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'referenceImages'],
        [
            _VideoGenerationReferenceImage_to_vertex(
                item, to_object, root_object
            )
            for item in getv(from_object, ['reference_images'])
        ],
    )

  if getv(from_object, ['mask']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'mask'],
        _VideoGenerationMask_to_vertex(
            getv(from_object, ['mask']), to_object, root_object
        ),
    )

  if getv(from_object, ['compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'compressionQuality'],
        getv(from_object, ['compression_quality']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['webhook_config']) is not None:
    raise ValueError(
        'webhook_config parameter is only supported in Gemini Developer API'
        ' mode, not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['resize_mode']) is not None:
    setv(
        parent_object,
        ['parameters', 'resizeMode'],
        getv(from_object, ['resize_mode']),
    )

  return to_object


def _GenerateVideosOperation_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['metadata']) is not None:
    setv(to_object, ['metadata'], getv(from_object, ['metadata']))

  if getv(from_object, ['done']) is not None:
    setv(to_object, ['done'], getv(from_object, ['done']))

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  if getv(from_object, ['response', 'generateVideoResponse']) is not None:
    setv(
        to_object,
        ['response'],
        _GenerateVideosResponse_from_mldev(
            getv(from_object, ['response', 'generateVideoResponse']),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['response', 'generateVideoResponse']) is not None:
    setv(
        to_object,
        ['result'],
        _GenerateVideosResponse_from_mldev(
            getv(from_object, ['response', 'generateVideoResponse']),
            to_object,
            root_object,
        ),
    )

  return to_object


def _GenerateVideosOperation_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['metadata']) is not None:
    setv(to_object, ['metadata'], getv(from_object, ['metadata']))

  if getv(from_object, ['done']) is not None:
    setv(to_object, ['done'], getv(from_object, ['done']))

  if getv(from_object, ['error']) is not None:
    setv(to_object, ['error'], getv(from_object, ['error']))

  if getv(from_object, ['response']) is not None:
    setv(
        to_object,
        ['response'],
        _GenerateVideosResponse_from_vertex(
            getv(from_object, ['response']), to_object, root_object
        ),
    )

  if getv(from_object, ['response']) is not None:
    setv(
        to_object,
        ['result'],
        _GenerateVideosResponse_from_vertex(
            getv(from_object, ['response']), to_object, root_object
        ),
    )

  return to_object


def _GenerateVideosParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt']))

  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['instances[0]', 'image'],
        _Image_to_mldev(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['video']) is not None:
    setv(
        to_object,
        ['instances[0]', 'video'],
        _Video_to_mldev(getv(from_object, ['video']), to_object, root_object),
    )

  if getv(from_object, ['source']) is not None:
    _GenerateVideosSource_to_mldev(
        getv(from_object, ['source']), to_object, root_object
    )

  if getv(from_object, ['config']) is not None:
    _GenerateVideosConfig_to_mldev(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _GenerateVideosParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt']))

  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['instances[0]', 'image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['video']) is not None:
    setv(
        to_object,
        ['instances[0]', 'video'],
        _Video_to_vertex(getv(from_object, ['video']), to_object, root_object),
    )

  if getv(from_object, ['source']) is not None:
    _GenerateVideosSource_to_vertex(
        getv(from_object, ['source']), to_object, root_object
    )

  if getv(from_object, ['config']) is not None:
    _GenerateVideosConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _GenerateVideosResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['generatedSamples']) is not None:
    setv(
        to_object,
        ['generated_videos'],
        [
            _GeneratedVideo_from_mldev(item, to_object, root_object)
            for item in getv(from_object, ['generatedSamples'])
        ],
    )

  if getv(from_object, ['raiMediaFilteredCount']) is not None:
    setv(
        to_object,
        ['rai_media_filtered_count'],
        getv(from_object, ['raiMediaFilteredCount']),
    )

  if getv(from_object, ['raiMediaFilteredReasons']) is not None:
    setv(
        to_object,
        ['rai_media_filtered_reasons'],
        getv(from_object, ['raiMediaFilteredReasons']),
    )

  return to_object


def _GenerateVideosResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['videos']) is not None:
    setv(
        to_object,
        ['generated_videos'],
        [
            _GeneratedVideo_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['videos'])
        ],
    )

  if getv(from_object, ['raiMediaFilteredCount']) is not None:
    setv(
        to_object,
        ['rai_media_filtered_count'],
        getv(from_object, ['raiMediaFilteredCount']),
    )

  if getv(from_object, ['raiMediaFilteredReasons']) is not None:
    setv(
        to_object,
        ['rai_media_filtered_reasons'],
        getv(from_object, ['raiMediaFilteredReasons']),
    )

  return to_object


def _GenerateVideosSource_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['prompt']) is not None:
    setv(
        parent_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt'])
    )

  if getv(from_object, ['image']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'image'],
        _Image_to_mldev(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['video']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'video'],
        _Video_to_mldev(getv(from_object, ['video']), to_object, root_object),
    )

  return to_object


def _GenerateVideosSource_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['prompt']) is not None:
    setv(
        parent_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt'])
    )

  if getv(from_object, ['image']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['video']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'video'],
        _Video_to_vertex(getv(from_object, ['video']), to_object, root_object),
    )

  return to_object


def _GeneratedImageMask_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['mask'],
        _Image_from_vertex(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  if getv(from_object, ['labels']) is not None:
    setv(
        to_object, ['labels'], [item for item in getv(from_object, ['labels'])]
    )

  return to_object


def _GeneratedImage_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_from_mldev(getv(from_object, ['_self']), to_object, root_object),
    )

  if getv(from_object, ['raiFilteredReason']) is not None:
    setv(
        to_object,
        ['rai_filtered_reason'],
        getv(from_object, ['raiFilteredReason']),
    )

  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['safety_attributes'],
        _SafetyAttributes_from_mldev(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  return to_object


def _GeneratedImage_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_from_vertex(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  if getv(from_object, ['raiFilteredReason']) is not None:
    setv(
        to_object,
        ['rai_filtered_reason'],
        getv(from_object, ['raiFilteredReason']),
    )

  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['safety_attributes'],
        _SafetyAttributes_from_vertex(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  if getv(from_object, ['prompt']) is not None:
    setv(to_object, ['enhanced_prompt'], getv(from_object, ['prompt']))

  return to_object


def _GeneratedVideo_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['video']) is not None:
    setv(
        to_object,
        ['video'],
        _Video_from_mldev(getv(from_object, ['video']), to_object, root_object),
    )

  return to_object


def _GeneratedVideo_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['video'],
        _Video_from_vertex(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  return to_object


def _GenerationConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model_selection_config']) is not None:
    setv(
        to_object,
        ['modelConfig'],
        getv(from_object, ['model_selection_config']),
    )

  if getv(from_object, ['response_json_schema']) is not None:
    setv(
        to_object,
        ['responseJsonSchema'],
        getv(from_object, ['response_json_schema']),
    )

  if getv(from_object, ['audio_timestamp']) is not None:
    setv(to_object, ['audioTimestamp'], getv(from_object, ['audio_timestamp']))

  if getv(from_object, ['candidate_count']) is not None:
    setv(to_object, ['candidateCount'], getv(from_object, ['candidate_count']))

  if getv(from_object, ['enable_affective_dialog']) is not None:
    setv(
        to_object,
        ['enableAffectiveDialog'],
        getv(from_object, ['enable_affective_dialog']),
    )

  if getv(from_object, ['frequency_penalty']) is not None:
    setv(
        to_object,
        ['frequencyPenalty'],
        getv(from_object, ['frequency_penalty']),
    )

  if getv(from_object, ['logprobs']) is not None:
    setv(to_object, ['logprobs'], getv(from_object, ['logprobs']))

  if getv(from_object, ['max_output_tokens']) is not None:
    setv(
        to_object, ['maxOutputTokens'], getv(from_object, ['max_output_tokens'])
    )

  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['presence_penalty']) is not None:
    setv(
        to_object, ['presencePenalty'], getv(from_object, ['presence_penalty'])
    )

  if getv(from_object, ['response_logprobs']) is not None:
    setv(
        to_object,
        ['responseLogprobs'],
        getv(from_object, ['response_logprobs']),
    )

  if getv(from_object, ['response_mime_type']) is not None:
    setv(
        to_object,
        ['responseMimeType'],
        getv(from_object, ['response_mime_type']),
    )

  if getv(from_object, ['response_modalities']) is not None:
    setv(
        to_object,
        ['responseModalities'],
        getv(from_object, ['response_modalities']),
    )

  if getv(from_object, ['response_schema']) is not None:
    setv(to_object, ['responseSchema'], getv(from_object, ['response_schema']))

  if getv(from_object, ['routing_config']) is not None:
    setv(to_object, ['routingConfig'], getv(from_object, ['routing_config']))

  if getv(from_object, ['seed']) is not None:
    setv(to_object, ['seed'], getv(from_object, ['seed']))

  if getv(from_object, ['speech_config']) is not None:
    setv(
        to_object,
        ['speechConfig'],
        _SpeechConfig_to_vertex(
            getv(from_object, ['speech_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['stop_sequences']) is not None:
    setv(to_object, ['stopSequences'], getv(from_object, ['stop_sequences']))

  if getv(from_object, ['temperature']) is not None:
    setv(to_object, ['temperature'], getv(from_object, ['temperature']))

  if getv(from_object, ['thinking_config']) is not None:
    setv(to_object, ['thinkingConfig'], getv(from_object, ['thinking_config']))

  if getv(from_object, ['top_k']) is not None:
    setv(to_object, ['topK'], getv(from_object, ['top_k']))

  if getv(from_object, ['top_p']) is not None:
    setv(to_object, ['topP'], getv(from_object, ['top_p']))

  if getv(from_object, ['enable_enhanced_civic_answers']) is not None:
    raise ValueError(
        'enable_enhanced_civic_answers parameter is only supported in Gemini'
        ' Developer API mode, not in Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _GetModelParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  return to_object


def _GetModelParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  return to_object


def _GoogleMaps_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['auth_config']) is not None:
    setv(
        to_object,
        ['authConfig'],
        _AuthConfig_to_mldev(
            getv(from_object, ['auth_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['enable_widget']) is not None:
    setv(to_object, ['enableWidget'], getv(from_object, ['enable_widget']))

  return to_object


def _GoogleSearch_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['search_types']) is not None:
    setv(to_object, ['searchTypes'], getv(from_object, ['search_types']))

  if getv(from_object, ['blocking_confidence']) is not None:
    raise ValueError(
        'blocking_confidence parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['exclude_domains']) is not None:
    raise ValueError(
        'exclude_domains parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['time_range_filter']) is not None:
    setv(
        to_object, ['timeRangeFilter'], getv(from_object, ['time_range_filter'])
    )

  return to_object


def _ImageConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['aspect_ratio']) is not None:
    setv(to_object, ['aspectRatio'], getv(from_object, ['aspect_ratio']))

  if getv(from_object, ['image_size']) is not None:
    setv(to_object, ['imageSize'], getv(from_object, ['image_size']))

  if getv(from_object, ['person_generation']) is not None:
    raise ValueError(
        'person_generation parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['prominent_people']) is not None:
    raise ValueError(
        'prominent_people parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['output_mime_type']) is not None:
    raise ValueError(
        'output_mime_type parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    raise ValueError(
        'output_compression_quality parameter is only supported in Gemini'
        ' Enterprise Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['image_output_options']) is not None:
    raise ValueError(
        'image_output_options parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  return to_object


def _ImageConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['aspect_ratio']) is not None:
    setv(to_object, ['aspectRatio'], getv(from_object, ['aspect_ratio']))

  if getv(from_object, ['image_size']) is not None:
    setv(to_object, ['imageSize'], getv(from_object, ['image_size']))

  if getv(from_object, ['person_generation']) is not None:
    setv(
        to_object,
        ['personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['prominent_people']) is not None:
    setv(
        to_object, ['prominentPeople'], getv(from_object, ['prominent_people'])
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        to_object,
        ['imageOutputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        to_object,
        ['imageOutputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['image_output_options']) is not None:
    setv(
        to_object,
        ['imageOutputOptions'],
        getv(from_object, ['image_output_options']),
    )

  return to_object


def _Image_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['bytesBase64Encoded']) is not None:
    setv(
        to_object,
        ['image_bytes'],
        base_t.t_bytes(getv(from_object, ['bytesBase64Encoded'])),
    )

  if getv(from_object, ['mimeType']) is not None:
    setv(to_object, ['mime_type'], getv(from_object, ['mimeType']))

  return to_object


def _Image_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['gcsUri']) is not None:
    setv(to_object, ['gcs_uri'], getv(from_object, ['gcsUri']))

  if getv(from_object, ['bytesBase64Encoded']) is not None:
    setv(
        to_object,
        ['image_bytes'],
        base_t.t_bytes(getv(from_object, ['bytesBase64Encoded'])),
    )

  if getv(from_object, ['mimeType']) is not None:
    setv(to_object, ['mime_type'], getv(from_object, ['mimeType']))

  return to_object


def _Image_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['gcs_uri']) is not None:
    raise ValueError(
        'gcs_uri parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['image_bytes']) is not None:
    setv(
        to_object,
        ['bytesBase64Encoded'],
        base_t.t_bytes(getv(from_object, ['image_bytes'])),
    )

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _Image_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['gcs_uri']) is not None:
    setv(to_object, ['gcsUri'], getv(from_object, ['gcs_uri']))

  if getv(from_object, ['image_bytes']) is not None:
    setv(
        to_object,
        ['bytesBase64Encoded'],
        base_t.t_bytes(getv(from_object, ['image_bytes'])),
    )

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _ListModelsConfig_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['page_size']) is not None:
    setv(
        parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size'])
    )

  if getv(from_object, ['page_token']) is not None:
    setv(
        parent_object,
        ['_query', 'pageToken'],
        getv(from_object, ['page_token']),
    )

  if getv(from_object, ['filter']) is not None:
    setv(parent_object, ['_query', 'filter'], getv(from_object, ['filter']))

  if getv(from_object, ['query_base']) is not None:
    setv(
        parent_object,
        ['_url', 'models_url'],
        t.t_models_url(api_client, getv(from_object, ['query_base'])),
    )

  return to_object


def _ListModelsConfig_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['page_size']) is not None:
    setv(
        parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size'])
    )

  if getv(from_object, ['page_token']) is not None:
    setv(
        parent_object,
        ['_query', 'pageToken'],
        getv(from_object, ['page_token']),
    )

  if getv(from_object, ['filter']) is not None:
    setv(parent_object, ['_query', 'filter'], getv(from_object, ['filter']))

  if getv(from_object, ['query_base']) is not None:
    setv(
        parent_object,
        ['_url', 'models_url'],
        t.t_models_url(api_client, getv(from_object, ['query_base'])),
    )

  return to_object


def _ListModelsParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['config']) is not None:
    _ListModelsConfig_to_mldev(
        api_client, getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _ListModelsParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['config']) is not None:
    _ListModelsConfig_to_vertex(
        api_client, getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _ListModelsResponse_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['nextPageToken']) is not None:
    setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken']))

  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['models'],
        [
            _Model_from_mldev(item, to_object, root_object)
            for item in t.t_extract_models(getv(from_object, ['_self']))
        ],
    )

  return to_object


def _ListModelsResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['nextPageToken']) is not None:
    setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken']))

  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['models'],
        [
            _Model_from_vertex(item, to_object, root_object)
            for item in t.t_extract_models(getv(from_object, ['_self']))
        ],
    )

  return to_object


def _MaskReferenceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['mask_mode']) is not None:
    setv(to_object, ['maskMode'], getv(from_object, ['mask_mode']))

  if getv(from_object, ['segmentation_classes']) is not None:
    setv(
        to_object, ['maskClasses'], getv(from_object, ['segmentation_classes'])
    )

  if getv(from_object, ['mask_dilation']) is not None:
    setv(to_object, ['dilation'], getv(from_object, ['mask_dilation']))

  return to_object


def _McpServer_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    raise ValueError(
        'name parameter is only supported in Gemini Developer API mode, not in'
        ' Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['streamable_http_transport']) is not None:
    raise ValueError(
        'streamable_http_transport parameter is only supported in Gemini'
        ' Developer API mode, not in Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _Model_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['displayName']) is not None:
    setv(to_object, ['display_name'], getv(from_object, ['displayName']))

  if getv(from_object, ['description']) is not None:
    setv(to_object, ['description'], getv(from_object, ['description']))

  if getv(from_object, ['version']) is not None:
    setv(to_object, ['version'], getv(from_object, ['version']))

  if getv(from_object, ['_self']) is not None:
    setv(to_object, ['tuned_model_info'], getv(from_object, ['_self']))

  if getv(from_object, ['inputTokenLimit']) is not None:
    setv(
        to_object, ['input_token_limit'], getv(from_object, ['inputTokenLimit'])
    )

  if getv(from_object, ['outputTokenLimit']) is not None:
    setv(
        to_object,
        ['output_token_limit'],
        getv(from_object, ['outputTokenLimit']),
    )

  if getv(from_object, ['supportedGenerationMethods']) is not None:
    setv(
        to_object,
        ['supported_actions'],
        getv(from_object, ['supportedGenerationMethods']),
    )

  if getv(from_object, ['temperature']) is not None:
    setv(to_object, ['temperature'], getv(from_object, ['temperature']))

  if getv(from_object, ['maxTemperature']) is not None:
    setv(to_object, ['max_temperature'], getv(from_object, ['maxTemperature']))

  if getv(from_object, ['topP']) is not None:
    setv(to_object, ['top_p'], getv(from_object, ['topP']))

  if getv(from_object, ['topK']) is not None:
    setv(to_object, ['top_k'], getv(from_object, ['topK']))

  if getv(from_object, ['thinking']) is not None:
    setv(to_object, ['thinking'], getv(from_object, ['thinking']))

  return to_object


def _Model_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['name']) is not None:
    setv(to_object, ['name'], getv(from_object, ['name']))

  if getv(from_object, ['displayName']) is not None:
    setv(to_object, ['display_name'], getv(from_object, ['displayName']))

  if getv(from_object, ['description']) is not None:
    setv(to_object, ['description'], getv(from_object, ['description']))

  if getv(from_object, ['versionId']) is not None:
    setv(to_object, ['version'], getv(from_object, ['versionId']))

  if getv(from_object, ['deployedModels']) is not None:
    setv(
        to_object,
        ['endpoints'],
        [
            _Endpoint_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['deployedModels'])
        ],
    )

  if getv(from_object, ['labels']) is not None:
    setv(to_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['_self']) is not None:
    setv(
        to_object,
        ['tuned_model_info'],
        _TunedModelInfo_from_vertex(
            getv(from_object, ['_self']), to_object, root_object
        ),
    )

  if getv(from_object, ['defaultCheckpointId']) is not None:
    setv(
        to_object,
        ['default_checkpoint_id'],
        getv(from_object, ['defaultCheckpointId']),
    )

  if getv(from_object, ['checkpoints']) is not None:
    setv(
        to_object,
        ['checkpoints'],
        [item for item in getv(from_object, ['checkpoints'])],
    )

  return to_object


def _MultiSpeakerVoiceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['speaker_voice_configs']) is not None:
    setv(
        to_object,
        ['speakerVoiceConfigs'],
        [
            _SpeakerVoiceConfig_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['speaker_voice_configs'])
        ],
    )

  return to_object


def _Part_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['code_execution_result']) is not None:
    setv(
        to_object,
        ['codeExecutionResult'],
        getv(from_object, ['code_execution_result']),
    )

  if getv(from_object, ['executable_code']) is not None:
    setv(to_object, ['executableCode'], getv(from_object, ['executable_code']))

  if getv(from_object, ['file_data']) is not None:
    setv(
        to_object,
        ['fileData'],
        _FileData_to_mldev(
            getv(from_object, ['file_data']), to_object, root_object
        ),
    )

  if getv(from_object, ['function_call']) is not None:
    setv(
        to_object,
        ['functionCall'],
        _FunctionCall_to_mldev(
            getv(from_object, ['function_call']), to_object, root_object
        ),
    )

  if getv(from_object, ['function_response']) is not None:
    setv(
        to_object,
        ['functionResponse'],
        getv(from_object, ['function_response']),
    )

  if getv(from_object, ['inline_data']) is not None:
    setv(
        to_object,
        ['inlineData'],
        _Blob_to_mldev(
            getv(from_object, ['inline_data']), to_object, root_object
        ),
    )

  if getv(from_object, ['text']) is not None:
    setv(to_object, ['text'], getv(from_object, ['text']))

  if getv(from_object, ['thought']) is not None:
    setv(to_object, ['thought'], getv(from_object, ['thought']))

  if getv(from_object, ['thought_signature']) is not None:
    setv(
        to_object,
        ['thoughtSignature'],
        getv(from_object, ['thought_signature']),
    )

  if getv(from_object, ['video_metadata']) is not None:
    setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata']))

  if getv(from_object, ['tool_call']) is not None:
    setv(to_object, ['toolCall'], getv(from_object, ['tool_call']))

  if getv(from_object, ['tool_response']) is not None:
    setv(to_object, ['toolResponse'], getv(from_object, ['tool_response']))

  if getv(from_object, ['part_metadata']) is not None:
    setv(to_object, ['partMetadata'], getv(from_object, ['part_metadata']))

  return to_object


def _Part_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['media_resolution']) is not None:
    setv(
        to_object, ['mediaResolution'], getv(from_object, ['media_resolution'])
    )

  if getv(from_object, ['code_execution_result']) is not None:
    setv(
        to_object,
        ['codeExecutionResult'],
        _CodeExecutionResult_to_vertex(
            getv(from_object, ['code_execution_result']), to_object, root_object
        ),
    )

  if getv(from_object, ['executable_code']) is not None:
    setv(
        to_object,
        ['executableCode'],
        _ExecutableCode_to_vertex(
            getv(from_object, ['executable_code']), to_object, root_object
        ),
    )

  if getv(from_object, ['file_data']) is not None:
    setv(to_object, ['fileData'], getv(from_object, ['file_data']))

  if getv(from_object, ['function_call']) is not None:
    setv(to_object, ['functionCall'], getv(from_object, ['function_call']))

  if getv(from_object, ['function_response']) is not None:
    setv(
        to_object,
        ['functionResponse'],
        getv(from_object, ['function_response']),
    )

  if getv(from_object, ['inline_data']) is not None:
    setv(to_object, ['inlineData'], getv(from_object, ['inline_data']))

  if getv(from_object, ['text']) is not None:
    setv(to_object, ['text'], getv(from_object, ['text']))

  if getv(from_object, ['thought']) is not None:
    setv(to_object, ['thought'], getv(from_object, ['thought']))

  if getv(from_object, ['thought_signature']) is not None:
    setv(
        to_object,
        ['thoughtSignature'],
        getv(from_object, ['thought_signature']),
    )

  if getv(from_object, ['video_metadata']) is not None:
    setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata']))

  if getv(from_object, ['tool_call']) is not None:
    raise ValueError(
        'tool_call parameter is only supported in Gemini Developer API mode,'
        ' not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['tool_response']) is not None:
    raise ValueError(
        'tool_response parameter is only supported in Gemini Developer API'
        ' mode, not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['part_metadata']) is not None:
    raise ValueError(
        'part_metadata parameter is only supported in Gemini Developer API'
        ' mode, not in Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _ProductImage_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['product_image']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_to_vertex(
            getv(from_object, ['product_image']), to_object, root_object
        ),
    )

  return to_object


def _RecontextImageConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['number_of_images']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_images']),
    )

  if getv(from_object, ['base_steps']) is not None:
    setv(
        parent_object,
        ['parameters', 'baseSteps'],
        getv(from_object, ['base_steps']),
    )

  if getv(from_object, ['output_gcs_uri']) is not None:
    setv(
        parent_object,
        ['parameters', 'storageUri'],
        getv(from_object, ['output_gcs_uri']),
    )

  if getv(from_object, ['seed']) is not None:
    setv(parent_object, ['parameters', 'seed'], getv(from_object, ['seed']))

  if getv(from_object, ['safety_filter_level']) is not None:
    setv(
        parent_object,
        ['parameters', 'safetySetting'],
        getv(from_object, ['safety_filter_level']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['add_watermark']) is not None:
    setv(
        parent_object,
        ['parameters', 'addWatermark'],
        getv(from_object, ['add_watermark']),
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['enhance_prompt']) is not None:
    setv(
        parent_object,
        ['parameters', 'enhancePrompt'],
        getv(from_object, ['enhance_prompt']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  return to_object


def _RecontextImageParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['source']) is not None:
    _RecontextImageSource_to_vertex(
        getv(from_object, ['source']), to_object, root_object
    )

  if getv(from_object, ['config']) is not None:
    _RecontextImageConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _RecontextImageResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_images'],
        [
            _GeneratedImage_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  return to_object


def _RecontextImageSource_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['prompt']) is not None:
    setv(
        parent_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt'])
    )

  if getv(from_object, ['person_image']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'personImage', 'image'],
        _Image_to_vertex(
            getv(from_object, ['person_image']), to_object, root_object
        ),
    )

  if getv(from_object, ['product_images']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'productImages'],
        [
            _ProductImage_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['product_images'])
        ],
    )

  return to_object


def _ReferenceImageAPI_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['reference_image']) is not None:
    setv(
        to_object,
        ['referenceImage'],
        _Image_to_vertex(
            getv(from_object, ['reference_image']), to_object, root_object
        ),
    )

  if getv(from_object, ['reference_id']) is not None:
    setv(to_object, ['referenceId'], getv(from_object, ['reference_id']))

  if getv(from_object, ['reference_type']) is not None:
    setv(to_object, ['referenceType'], getv(from_object, ['reference_type']))

  if getv(from_object, ['mask_image_config']) is not None:
    setv(
        to_object,
        ['maskImageConfig'],
        _MaskReferenceConfig_to_vertex(
            getv(from_object, ['mask_image_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['control_image_config']) is not None:
    setv(
        to_object,
        ['controlImageConfig'],
        _ControlReferenceConfig_to_vertex(
            getv(from_object, ['control_image_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['style_image_config']) is not None:
    setv(
        to_object,
        ['styleImageConfig'],
        getv(from_object, ['style_image_config']),
    )

  if getv(from_object, ['subject_image_config']) is not None:
    setv(
        to_object,
        ['subjectImageConfig'],
        getv(from_object, ['subject_image_config']),
    )

  return to_object


def _ReplicatedVoiceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  if getv(from_object, ['voice_sample_audio']) is not None:
    setv(
        to_object,
        ['voiceSampleAudio'],
        getv(from_object, ['voice_sample_audio']),
    )

  if getv(from_object, ['consent_audio']) is not None:
    raise ValueError(
        'consent_audio parameter is only supported in Gemini Developer API'
        ' mode, not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['voice_consent_signature']) is not None:
    raise ValueError(
        'voice_consent_signature parameter is only supported in Gemini'
        ' Developer API mode, not in Gemini Enterprise Agent Platform mode.'
    )

  return to_object


def _SafetyAttributes_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['safetyAttributes', 'categories']) is not None:
    setv(
        to_object,
        ['categories'],
        getv(from_object, ['safetyAttributes', 'categories']),
    )

  if getv(from_object, ['safetyAttributes', 'scores']) is not None:
    setv(
        to_object, ['scores'], getv(from_object, ['safetyAttributes', 'scores'])
    )

  if getv(from_object, ['contentType']) is not None:
    setv(to_object, ['content_type'], getv(from_object, ['contentType']))

  return to_object


def _SafetyAttributes_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['safetyAttributes', 'categories']) is not None:
    setv(
        to_object,
        ['categories'],
        getv(from_object, ['safetyAttributes', 'categories']),
    )

  if getv(from_object, ['safetyAttributes', 'scores']) is not None:
    setv(
        to_object, ['scores'], getv(from_object, ['safetyAttributes', 'scores'])
    )

  if getv(from_object, ['contentType']) is not None:
    setv(to_object, ['content_type'], getv(from_object, ['contentType']))

  return to_object


def _SafetySetting_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['category']) is not None:
    setv(to_object, ['category'], getv(from_object, ['category']))

  if getv(from_object, ['method']) is not None:
    raise ValueError(
        'method parameter is only supported in Gemini Enterprise Agent Platform'
        ' mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['threshold']) is not None:
    setv(to_object, ['threshold'], getv(from_object, ['threshold']))

  return to_object


def _ScribbleImage_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  return to_object


def _SegmentImageConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['mode']) is not None:
    setv(parent_object, ['parameters', 'mode'], getv(from_object, ['mode']))

  if getv(from_object, ['max_predictions']) is not None:
    setv(
        parent_object,
        ['parameters', 'maxPredictions'],
        getv(from_object, ['max_predictions']),
    )

  if getv(from_object, ['confidence_threshold']) is not None:
    setv(
        parent_object,
        ['parameters', 'confidenceThreshold'],
        getv(from_object, ['confidence_threshold']),
    )

  if getv(from_object, ['mask_dilation']) is not None:
    setv(
        parent_object,
        ['parameters', 'maskDilation'],
        getv(from_object, ['mask_dilation']),
    )

  if getv(from_object, ['binary_color_threshold']) is not None:
    setv(
        parent_object,
        ['parameters', 'binaryColorThreshold'],
        getv(from_object, ['binary_color_threshold']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  return to_object


def _SegmentImageParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['source']) is not None:
    _SegmentImageSource_to_vertex(
        getv(from_object, ['source']), to_object, root_object
    )

  if getv(from_object, ['config']) is not None:
    _SegmentImageConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _SegmentImageResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_masks'],
        [
            _GeneratedImageMask_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  return to_object


def _SegmentImageSource_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['prompt']) is not None:
    setv(
        parent_object, ['instances[0]', 'prompt'], getv(from_object, ['prompt'])
    )

  if getv(from_object, ['image']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['scribble_image']) is not None:
    setv(
        parent_object,
        ['instances[0]', 'scribble'],
        _ScribbleImage_to_vertex(
            getv(from_object, ['scribble_image']), to_object, root_object
        ),
    )

  return to_object


def _SpeakerVoiceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['speaker']) is not None:
    setv(to_object, ['speaker'], getv(from_object, ['speaker']))

  if getv(from_object, ['voice_config']) is not None:
    setv(
        to_object,
        ['voiceConfig'],
        _VoiceConfig_to_vertex(
            getv(from_object, ['voice_config']), to_object, root_object
        ),
    )

  return to_object


def _SpeechConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['voice_config']) is not None:
    setv(
        to_object,
        ['voiceConfig'],
        _VoiceConfig_to_vertex(
            getv(from_object, ['voice_config']), to_object, root_object
        ),
    )

  if getv(from_object, ['language_code']) is not None:
    setv(to_object, ['languageCode'], getv(from_object, ['language_code']))

  if getv(from_object, ['multi_speaker_voice_config']) is not None:
    setv(
        to_object,
        ['multiSpeakerVoiceConfig'],
        _MultiSpeakerVoiceConfig_to_vertex(
            getv(from_object, ['multi_speaker_voice_config']),
            to_object,
            root_object,
        ),
    )

  return to_object


def _ToolConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['retrieval_config']) is not None:
    setv(
        to_object, ['retrievalConfig'], getv(from_object, ['retrieval_config'])
    )

  if getv(from_object, ['function_calling_config']) is not None:
    setv(
        to_object,
        ['functionCallingConfig'],
        _FunctionCallingConfig_to_mldev(
            getv(from_object, ['function_calling_config']),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['include_server_side_tool_invocations']) is not None:
    setv(
        to_object,
        ['includeServerSideToolInvocations'],
        getv(from_object, ['include_server_side_tool_invocations']),
    )

  return to_object


def _ToolConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['retrieval_config']) is not None:
    setv(
        to_object, ['retrievalConfig'], getv(from_object, ['retrieval_config'])
    )

  if getv(from_object, ['function_calling_config']) is not None:
    setv(
        to_object,
        ['functionCallingConfig'],
        getv(from_object, ['function_calling_config']),
    )

  if getv(from_object, ['include_server_side_tool_invocations']) is not None:
    raise ValueError(
        'include_server_side_tool_invocations parameter is only supported in'
        ' Gemini Developer API mode, not in Gemini Enterprise Agent Platform'
        ' mode.'
    )

  return to_object


def _Tool_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['retrieval']) is not None:
    raise ValueError(
        'retrieval parameter is only supported in Gemini Enterprise Agent'
        ' Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['computer_use']) is not None:
    setv(to_object, ['computerUse'], getv(from_object, ['computer_use']))

  if getv(from_object, ['file_search']) is not None:
    setv(to_object, ['fileSearch'], getv(from_object, ['file_search']))

  if getv(from_object, ['google_search']) is not None:
    setv(
        to_object,
        ['googleSearch'],
        _GoogleSearch_to_mldev(
            getv(from_object, ['google_search']), to_object, root_object
        ),
    )

  if getv(from_object, ['google_maps']) is not None:
    setv(
        to_object,
        ['googleMaps'],
        _GoogleMaps_to_mldev(
            getv(from_object, ['google_maps']), to_object, root_object
        ),
    )

  if getv(from_object, ['code_execution']) is not None:
    setv(to_object, ['codeExecution'], getv(from_object, ['code_execution']))

  if getv(from_object, ['enterprise_web_search']) is not None:
    raise ValueError(
        'enterprise_web_search parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['function_declarations']) is not None:
    setv(
        to_object,
        ['functionDeclarations'],
        [item for item in getv(from_object, ['function_declarations'])],
    )

  if getv(from_object, ['google_search_retrieval']) is not None:
    setv(
        to_object,
        ['googleSearchRetrieval'],
        getv(from_object, ['google_search_retrieval']),
    )

  if getv(from_object, ['parallel_ai_search']) is not None:
    raise ValueError(
        'parallel_ai_search parameter is only supported in Gemini Enterprise'
        ' Agent Platform mode, not in Gemini Developer API mode.'
    )

  if getv(from_object, ['url_context']) is not None:
    setv(to_object, ['urlContext'], getv(from_object, ['url_context']))

  if getv(from_object, ['mcp_servers']) is not None:
    setv(
        to_object,
        ['mcpServers'],
        [item for item in getv(from_object, ['mcp_servers'])],
    )

  return to_object


def _Tool_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['retrieval']) is not None:
    setv(to_object, ['retrieval'], getv(from_object, ['retrieval']))

  if getv(from_object, ['computer_use']) is not None:
    setv(to_object, ['computerUse'], getv(from_object, ['computer_use']))

  if getv(from_object, ['file_search']) is not None:
    raise ValueError(
        'file_search parameter is only supported in Gemini Developer API mode,'
        ' not in Gemini Enterprise Agent Platform mode.'
    )

  if getv(from_object, ['google_search']) is not None:
    setv(to_object, ['googleSearch'], getv(from_object, ['google_search']))

  if getv(from_object, ['google_maps']) is not None:
    setv(to_object, ['googleMaps'], getv(from_object, ['google_maps']))

  if getv(from_object, ['code_execution']) is not None:
    setv(to_object, ['codeExecution'], getv(from_object, ['code_execution']))

  if getv(from_object, ['enterprise_web_search']) is not None:
    setv(
        to_object,
        ['enterpriseWebSearch'],
        getv(from_object, ['enterprise_web_search']),
    )

  if getv(from_object, ['function_declarations']) is not None:
    setv(
        to_object,
        ['functionDeclarations'],
        [item for item in getv(from_object, ['function_declarations'])],
    )

  if getv(from_object, ['google_search_retrieval']) is not None:
    setv(
        to_object,
        ['googleSearchRetrieval'],
        getv(from_object, ['google_search_retrieval']),
    )

  if getv(from_object, ['parallel_ai_search']) is not None:
    setv(
        to_object,
        ['parallelAiSearch'],
        getv(from_object, ['parallel_ai_search']),
    )

  if getv(from_object, ['url_context']) is not None:
    setv(to_object, ['urlContext'], getv(from_object, ['url_context']))

  if getv(from_object, ['mcp_servers']) is not None:
    setv(
        to_object,
        ['mcpServers'],
        [
            _McpServer_to_vertex(item, to_object, root_object)
            for item in getv(from_object, ['mcp_servers'])
        ],
    )

  return to_object


def _TunedModelInfo_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if (
      getv(from_object, ['labels', 'google-vertex-llm-tuning-base-model-id'])
      is not None
  ):
    setv(
        to_object,
        ['base_model'],
        getv(from_object, ['labels', 'google-vertex-llm-tuning-base-model-id']),
    )

  if getv(from_object, ['createTime']) is not None:
    setv(to_object, ['create_time'], getv(from_object, ['createTime']))

  if getv(from_object, ['updateTime']) is not None:
    setv(to_object, ['update_time'], getv(from_object, ['updateTime']))

  return to_object


def _UpdateModelConfig_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['display_name']) is not None:
    setv(parent_object, ['displayName'], getv(from_object, ['display_name']))

  if getv(from_object, ['description']) is not None:
    setv(parent_object, ['description'], getv(from_object, ['description']))

  if getv(from_object, ['default_checkpoint_id']) is not None:
    setv(
        parent_object,
        ['defaultCheckpointId'],
        getv(from_object, ['default_checkpoint_id']),
    )

  return to_object


def _UpdateModelConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['display_name']) is not None:
    setv(parent_object, ['displayName'], getv(from_object, ['display_name']))

  if getv(from_object, ['description']) is not None:
    setv(parent_object, ['description'], getv(from_object, ['description']))

  if getv(from_object, ['default_checkpoint_id']) is not None:
    setv(
        parent_object,
        ['defaultCheckpointId'],
        getv(from_object, ['default_checkpoint_id']),
    )

  return to_object


def _UpdateModelParameters_to_mldev(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'name'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['config']) is not None:
    _UpdateModelConfig_to_mldev(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _UpdateModelParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['config']) is not None:
    _UpdateModelConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _UpscaleImageAPIConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}

  if getv(from_object, ['output_gcs_uri']) is not None:
    setv(
        parent_object,
        ['parameters', 'storageUri'],
        getv(from_object, ['output_gcs_uri']),
    )

  if getv(from_object, ['safety_filter_level']) is not None:
    setv(
        parent_object,
        ['parameters', 'safetySetting'],
        getv(from_object, ['safety_filter_level']),
    )

  if getv(from_object, ['person_generation']) is not None:
    setv(
        parent_object,
        ['parameters', 'personGeneration'],
        getv(from_object, ['person_generation']),
    )

  if getv(from_object, ['include_rai_reason']) is not None:
    setv(
        parent_object,
        ['parameters', 'includeRaiReason'],
        getv(from_object, ['include_rai_reason']),
    )

  if getv(from_object, ['output_mime_type']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'mimeType'],
        getv(from_object, ['output_mime_type']),
    )

  if getv(from_object, ['output_compression_quality']) is not None:
    setv(
        parent_object,
        ['parameters', 'outputOptions', 'compressionQuality'],
        getv(from_object, ['output_compression_quality']),
    )

  if getv(from_object, ['enhance_input_image']) is not None:
    setv(
        parent_object,
        ['parameters', 'upscaleConfig', 'enhanceInputImage'],
        getv(from_object, ['enhance_input_image']),
    )

  if getv(from_object, ['image_preservation_factor']) is not None:
    setv(
        parent_object,
        ['parameters', 'upscaleConfig', 'imagePreservationFactor'],
        getv(from_object, ['image_preservation_factor']),
    )

  if getv(from_object, ['labels']) is not None:
    setv(parent_object, ['labels'], getv(from_object, ['labels']))

  if getv(from_object, ['number_of_images']) is not None:
    setv(
        parent_object,
        ['parameters', 'sampleCount'],
        getv(from_object, ['number_of_images']),
    )

  if getv(from_object, ['mode']) is not None:
    setv(parent_object, ['parameters', 'mode'], getv(from_object, ['mode']))

  return to_object


def _UpscaleImageAPIParameters_to_vertex(
    api_client: BaseApiClient,
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['model']) is not None:
    setv(
        to_object,
        ['_url', 'model'],
        t.t_model(api_client, getv(from_object, ['model'])),
    )

  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['instances[0]', 'image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['upscale_factor']) is not None:
    setv(
        to_object,
        ['parameters', 'upscaleConfig', 'upscaleFactor'],
        getv(from_object, ['upscale_factor']),
    )

  if getv(from_object, ['config']) is not None:
    _UpscaleImageAPIConfig_to_vertex(
        getv(from_object, ['config']), to_object, root_object
    )

  return to_object


def _UpscaleImageResponse_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['sdkHttpResponse']) is not None:
    setv(
        to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse'])
    )

  if getv(from_object, ['predictions']) is not None:
    setv(
        to_object,
        ['generated_images'],
        [
            _GeneratedImage_from_vertex(item, to_object, root_object)
            for item in getv(from_object, ['predictions'])
        ],
    )

  return to_object


def _VideoGenerationMask_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['_self'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['mask_mode']) is not None:
    setv(to_object, ['maskMode'], getv(from_object, ['mask_mode']))

  return to_object


def _VideoGenerationReferenceImage_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_to_mldev(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['reference_type']) is not None:
    _VideoGenerationReferenceType_to_mldev_enum_validate(
        getv(from_object, ['reference_type'])
    )
    setv(to_object, ['referenceType'], getv(from_object, ['reference_type']))

  return to_object


def _VideoGenerationReferenceImage_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['image']) is not None:
    setv(
        to_object,
        ['image'],
        _Image_to_vertex(getv(from_object, ['image']), to_object, root_object),
    )

  if getv(from_object, ['reference_type']) is not None:
    setv(to_object, ['referenceType'], getv(from_object, ['reference_type']))

  return to_object


def _Video_from_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['uri']) is not None:
    setv(to_object, ['uri'], getv(from_object, ['uri']))

  if getv(from_object, ['encodedVideo']) is not None:
    setv(
        to_object,
        ['video_bytes'],
        base_t.t_bytes(getv(from_object, ['encodedVideo'])),
    )

  if getv(from_object, ['encoding']) is not None:
    setv(to_object, ['mime_type'], getv(from_object, ['encoding']))

  return to_object


def _Video_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['gcsUri']) is not None:
    setv(to_object, ['uri'], getv(from_object, ['gcsUri']))

  if getv(from_object, ['bytesBase64Encoded']) is not None:
    setv(
        to_object,
        ['video_bytes'],
        base_t.t_bytes(getv(from_object, ['bytesBase64Encoded'])),
    )

  if getv(from_object, ['mimeType']) is not None:
    setv(to_object, ['mime_type'], getv(from_object, ['mimeType']))

  return to_object


def _Video_to_mldev(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['uri']) is not None:
    setv(to_object, ['uri'], getv(from_object, ['uri']))

  if getv(from_object, ['video_bytes']) is not None:
    setv(
        to_object,
        ['encodedVideo'],
        base_t.t_bytes(getv(from_object, ['video_bytes'])),
    )

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['encoding'], getv(from_object, ['mime_type']))

  return to_object


def _Video_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['uri']) is not None:
    setv(to_object, ['gcsUri'], getv(from_object, ['uri']))

  if getv(from_object, ['video_bytes']) is not None:
    setv(
        to_object,
        ['bytesBase64Encoded'],
        base_t.t_bytes(getv(from_object, ['video_bytes'])),
    )

  if getv(from_object, ['mime_type']) is not None:
    setv(to_object, ['mimeType'], getv(from_object, ['mime_type']))

  return to_object


def _VoiceConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
    root_object: Optional[Union[dict[str, Any], object]] = None,
) -> dict[str, Any]:
  to_object: dict[str, Any] = {}
  if getv(from_object, ['replicated_voice_config']) is not None:
    setv(
        to_object,
        ['replicatedVoiceConfig'],
        _ReplicatedVoiceConfig_to_vertex(
            getv(from_object, ['replicated_voice_config']),
            to_object,
            root_object,
        ),
    )

  if getv(from_object, ['prebuilt_voice_config']) is not None:
    setv(
        to_object,
        ['prebuiltVoiceConfig'],
        getv(from_object, ['prebuilt_voice_config']),
    )

  return to_object


class Models(_api_module.BaseModule):

  def _generate_content(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> types.GenerateContentResponse:
    parameter_model = types._GenerateContentParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateContentParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:generateContent'.format_map(request_url_dict)
      else:
        path = '{model}:generateContent'
    else:
      request_dict = _GenerateContentParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:generateContent'.format_map(request_url_dict)
      else:
        path = '{model}:generateContent'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      return_value = types.GenerateContentResponse(sdk_http_response=response)
      self._api_client._verify_response(return_value)
      return return_value

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateContentResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateContentResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateContentResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _generate_content_stream(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> Iterator[types.GenerateContentResponse]:
    parameter_model = types._GenerateContentParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateContentParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:streamGenerateContent?alt=sse'.format_map(
            request_url_dict
        )
      else:
        path = '{model}:streamGenerateContent?alt=sse'
    else:
      request_dict = _GenerateContentParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:streamGenerateContent?alt=sse'.format_map(
            request_url_dict
        )
      else:
        path = '{model}:streamGenerateContent?alt=sse'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      raise ValueError(
          'Accessing the raw HTTP response is not supported in streaming'
          ' methods.'
      )

    for response in self._api_client.request_streamed(
        'post', path, request_dict, http_options
    ):

      response_dict = {} if not response.body else json.loads(response.body)

      if self._api_client.vertexai:
        response_dict = _GenerateContentResponse_from_vertex(
            response_dict, None, parameter_model
        )

      if not self._api_client.vertexai:
        response_dict = _GenerateContentResponse_from_mldev(
            response_dict, None, parameter_model
        )

      return_value = types.GenerateContentResponse._from_response(
          response=response_dict,
          kwargs={
              'config': {
                  'response_schema': getattr(
                      parameter_model.config, 'response_schema', None
                  ),
                  'response_json_schema': getattr(
                      parameter_model.config, 'response_json_schema', None
                  ),
                  'include_all_fields': getattr(
                      parameter_model.config, 'include_all_fields', None
                  ),
              }
          }
          if getattr(parameter_model, 'config', None)
          else {},
      )
      return_value.sdk_http_response = types.HttpResponse(
          headers=response.headers
      )
      self._api_client._verify_response(return_value)
      yield return_value

  def _embed_content(
      self,
      *,
      model: str,
      contents: Optional[
          Union[types.ContentListUnion, types.ContentListUnionDict]
      ] = None,
      content: Optional[
          Union[types.ContentUnion, types.ContentUnionDict]
      ] = None,
      embedding_api_type: Optional[types.EmbeddingApiType] = None,
      config: Optional[types.EmbedContentConfigOrDict] = None,
  ) -> types.EmbedContentResponse:
    """Calculates embeddings for the given contents. Only text is supported.

    Args:
      model (str): The model to use.
      contents (list[Content]): The contents to embed.
      config (EmbedContentConfig): Optional configuration for embeddings.

    Usage:

    .. code-block:: python

      embeddings = client.models.embed_content(
          model= 'text-embedding-004',
          contents=[
              'What is your name?',
              'What is your favorite color?',
          ],
          config={
              'output_dimensionality': 64
          },
      )
    """

    parameter_model = types._EmbedContentParametersPrivate(
        model=model,
        contents=contents,
        content=content,
        embedding_api_type=embedding_api_type,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _EmbedContentParametersPrivate_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      endpoint_url = '{model}:embedContent' if t.t_is_vertex_embed_content_model(parameter_model.model) else '{model}:predict'  # type: ignore[arg-type]
      if request_url_dict:
        path = endpoint_url.format_map(request_url_dict)
      else:
        path = endpoint_url
    else:
      request_dict = _EmbedContentParametersPrivate_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:batchEmbedContents'.format_map(request_url_dict)
      else:
        path = '{model}:batchEmbedContents'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _EmbedContentResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _EmbedContentResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.EmbedContentResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _generate_images(
      self,
      *,
      model: str,
      prompt: str,
      config: Optional[types.GenerateImagesConfigOrDict] = None,
  ) -> types.GenerateImagesResponse:
    """Private method for generating images."""

    parameter_model = types._GenerateImagesParameters(
        model=model,
        prompt=prompt,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateImagesParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'
    else:
      request_dict = _GenerateImagesParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateImagesResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateImagesResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateImagesResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _edit_image(
      self,
      *,
      model: str,
      prompt: str,
      reference_images: list[types._ReferenceImageAPIOrDict],
      config: Optional[types.EditImageConfigOrDict] = None,
  ) -> types.EditImageResponse:
    """Private method for editing an image."""

    parameter_model = types._EditImageParameters(
        model=model,
        prompt=prompt,
        reference_images=reference_images,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _EditImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _EditImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.EditImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _upscale_image(
      self,
      *,
      model: str,
      image: types.ImageOrDict,
      upscale_factor: str,
      config: Optional[types._UpscaleImageAPIConfigOrDict] = None,
  ) -> types.UpscaleImageResponse:
    """Private method for upscaling an image."""

    parameter_model = types._UpscaleImageAPIParameters(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _UpscaleImageAPIParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _UpscaleImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.UpscaleImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def recontext_image(
      self,
      *,
      model: str,
      source: types.RecontextImageSourceOrDict,
      config: Optional[types.RecontextImageConfigOrDict] = None,
  ) -> types.RecontextImageResponse:
    """Recontextualizes an image.

    There is one type of recontextualization currently supported:
    1) Virtual Try-On: Generate images of persons modeling fashion products.

    Args:
      model (str): The model to use.
      source (RecontextImageSource): An object containing the source inputs
        (prompt, person_image, product_images) for image recontext. prompt is
        behind an allowlist. person_image is required. product_images is
        required. Only one product image is supported currently.
      config (RecontextImageConfig): Configuration for recontextualization.

    Usage:

      ```
      virtual_try_on_response = client.models.recontext_image(
          model="virtual-try-on-001",
          source=types.RecontextImageSource(
              person_image=types.Image.from_file(location=IMAGE1_FILE_PATH),
              product_images=[types.ProductImage(product_image=
                  types.Image.from_file(location=IMAGE2_FILE_PATH)
              )],
          ),
          config=types.RecontextImageConfig(
              number_of_images=1,
          ),
      )
      image = virtual_try_on_response.generated_images[0].image
      ```
    """

    parameter_model = types._RecontextImageParameters(
        model=model,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _RecontextImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _RecontextImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.RecontextImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  def segment_image(
      self,
      *,
      model: str,
      source: types.SegmentImageSourceOrDict,
      config: Optional[types.SegmentImageConfigOrDict] = None,
  ) -> types.SegmentImageResponse:
    """Segments an image, creating a mask of a specified area.

    Args:
      model (str): The model to use.
      source (SegmentImageSource): An object containing the source inputs
        (prompt, image, scribble_image) for image segmentation. The prompt is
        required for prompt mode and semantic mode, disallowed for other modes.
        scribble_image is required for the interactive mode, disallowed for
        other modes.
      config (SegmentImageConfig): Configuration for segmentation.

    Usage:

      ```
      response = client.models.segment_image(
          model="image-segmentation-001",
          source=types.SegmentImageSource(
              image=types.Image.from_file(location=IMAGE_FILE_PATH),
          ),
      )

      mask_image = response.generated_masks[0].mask
      ```
    """

    parameter_model = types._SegmentImageParameters(
        model=model,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _SegmentImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _SegmentImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.SegmentImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  def get(
      self, *, model: str, config: Optional[types.GetModelConfigOrDict] = None
  ) -> types.Model:
    parameter_model = types._GetModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GetModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    else:
      request_dict = _GetModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _Model_from_vertex(response_dict, None, parameter_model)

    if not self._api_client.vertexai:
      response_dict = _Model_from_mldev(response_dict, None, parameter_model)

    return_value = types.Model._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  def _list(
      self, *, config: Optional[types.ListModelsConfigOrDict] = None
  ) -> types.ListModelsResponse:
    parameter_model = types._ListModelsParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _ListModelsParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{models_url}'.format_map(request_url_dict)
      else:
        path = '{models_url}'
    else:
      request_dict = _ListModelsParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{models_url}'.format_map(request_url_dict)
      else:
        path = '{models_url}'
    query_params = request_dict.get('_query')
    if query_params and query_params.get('filter'):
      query_param_filter = query_params.pop('filter')
      path = f'{path}?filter={query_param_filter}'
      if query_params:
        path += f'&{urlencode(query_params)}'
    elif query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request('get', path, request_dict, http_options)

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ListModelsResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _ListModelsResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.ListModelsResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def update(
      self,
      *,
      model: str,
      config: Optional[types.UpdateModelConfigOrDict] = None,
  ) -> types.Model:
    parameter_model = types._UpdateModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _UpdateModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}'.format_map(request_url_dict)
      else:
        path = '{model}'
    else:
      request_dict = _UpdateModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'patch', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _Model_from_vertex(response_dict, None, parameter_model)

    if not self._api_client.vertexai:
      response_dict = _Model_from_mldev(response_dict, None, parameter_model)

    return_value = types.Model._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  def delete(
      self,
      *,
      model: str,
      config: Optional[types.DeleteModelConfigOrDict] = None,
  ) -> types.DeleteModelResponse:
    parameter_model = types._DeleteModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _DeleteModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    else:
      request_dict = _DeleteModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'delete', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _DeleteModelResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _DeleteModelResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.DeleteModelResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def count_tokens(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.CountTokensConfigOrDict] = None,
  ) -> types.CountTokensResponse:
    """Counts the number of tokens in the given content.

    Multimodal input is supported for Gemini models.

    Args:
      model (str): The model to use for counting tokens.
      contents (list[types.Content]): The content to count tokens for.
      config (CountTokensConfig): The configuration for counting tokens.

    Usage:

    .. code-block:: python

      response = client.models.count_tokens(
          model='gemini-2.0-flash',
          contents='What is your name?',
      )
      print(response)
      # total_tokens=5 cached_content_token_count=None
    """

    parameter_model = types._CountTokensParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CountTokensParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:countTokens'.format_map(request_url_dict)
      else:
        path = '{model}:countTokens'
    else:
      request_dict = _CountTokensParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:countTokens'.format_map(request_url_dict)
      else:
        path = '{model}:countTokens'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _CountTokensResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _CountTokensResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.CountTokensResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def compute_tokens(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.ComputeTokensConfigOrDict] = None,
  ) -> types.ComputeTokensResponse:
    """Given a list of contents, returns a corresponding TokensInfo containing the

    list of tokens and list of token ids.

    This method is not supported by the Gemini Developer API.

    Args:
      model (str): The model to use.
      contents (list[shared.Content]): The content to compute tokens for.

    Usage:

    .. code-block:: python

      response = client.models.compute_tokens(
          model='gemini-2.0-flash',
          contents='What is your name?',
      )
      print(response)
      # tokens_info=[TokensInfo(role='user', token_ids=['1841', ...],
      # tokens=[b'What', b' is', b' your', b' name', b'?'])]
    """

    parameter_model = types._ComputeTokensParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _ComputeTokensParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:computeTokens'.format_map(request_url_dict)
      else:
        path = '{model}:computeTokens'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ComputeTokensResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.ComputeTokensResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  def _generate_videos(
      self,
      *,
      model: str,
      prompt: Optional[str] = None,
      image: Optional[types.ImageOrDict] = None,
      video: Optional[types.VideoOrDict] = None,
      source: Optional[types.GenerateVideosSourceOrDict] = None,
      config: Optional[types.GenerateVideosConfigOrDict] = None,
  ) -> types.GenerateVideosOperation:
    """Private method for generating videos."""

    parameter_model = types._GenerateVideosParameters(
        model=model,
        prompt=prompt,
        image=image,
        video=video,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateVideosParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predictLongRunning'.format_map(request_url_dict)
      else:
        path = '{model}:predictLongRunning'
    else:
      request_dict = _GenerateVideosParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predictLongRunning'.format_map(request_url_dict)
      else:
        path = '{model}:predictLongRunning'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = self._api_client.request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateVideosOperation_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateVideosOperation_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateVideosOperation._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  def embed_content(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.EmbedContentConfigOrDict] = None,
  ) -> types.EmbedContentResponse:
    """Calculates embeddings for the given contents.

    Args:
      model (str): The model to use.
      contents (list[Content]): The contents to embed.
      config (EmbedContentConfig): Optional configuration for embeddings.

    Usage:

    .. code-block:: python

      embeddings = client.models.embed_content(
          model= 'text-embedding-004',
          contents=[
              'What is your name?',
              'What is your favorite color?',
          ],
          config={
              'output_dimensionality': 64
          },
      )

      multimodal_embeddings = client.models.embed_content(
          model='gemini-embedding-2-preview',
          contents=[
              types.Part.from_uri(
                  file_uri='gs://generativeai-downloads/images/scones.jpg',
                  mime_type='image/jpeg',
              ),
          ],
          config={
              'output_dimensionality': 64
          },
      )
    """
    if not self._api_client.vertexai:
      if 'gemini-embedding-2' in model:
        contents = t.t_contents(contents)  # type: ignore[assignment]
      return self._embed_content(model=model, contents=contents, config=config)

    if t.t_is_vertex_embed_content_model(model):
      normalized_contents = t.t_contents(contents)
      if len(normalized_contents) > 1:
        raise ValueError(
            'The embedContent API for this model only supports one content at a'
            ' time.'
        )
      return self._embed_content(
          model=model,
          contents=None,
          content=normalized_contents[0],
          embedding_api_type=types.EmbeddingApiType.EMBED_CONTENT,
          config=config,
      )
    else:
      return self._embed_content(
          model=model,
          content=None,
          contents=contents,
          embedding_api_type=types.EmbeddingApiType.PREDICT,
          config=config,
      )

  def generate_content(
      self,
      *,
      model: str,
      contents: types.ContentListUnionDict,
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> types.GenerateContentResponse:
    """Makes an API request to generate content using a model.

    For the `model` parameter, supported formats for Gemini Enterprise Agent
    Platform API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The full resource name starts with 'projects/', for example:
      'projects/my-project-id/locations/us-central1/publishers/google/models/gemini-2.0-flash'
    - The partial resource name with 'publishers/', for example:
      'publishers/google/models/gemini-2.0-flash' or
    - `/` separated publisher and model name, for example:
      'google/gemini-2.0-flash'

    For the `model` parameter, supported formats for Gemini API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The model name starts with 'models/', for example:
      'models/gemini-2.0-flash'
    - For tuned models, the model name starts with 'tunedModels/',
      for example:
      'tunedModels/1234567890123456789'

    Some models support multimodal input and output.

    Built-in MCP support is an experimental feature.

    Usage:

    .. code-block:: python

      from google.genai import types
      from google import genai

      client = genai.Client(
          vertexai=True, project='my-project-id', location='us-central1'
      )

      response = client.models.generate_content(
        model='gemini-2.0-flash',
        contents='''What is a good name for a flower shop that specializes in
          selling bouquets of dried flowers?'''
      )
      print(response.text)
      # **Elegant & Classic:**
      # * The Dried Bloom
      # * Everlasting Florals
      # * Timeless Petals

      response = client.models.generate_content(
        model='gemini-2.0-flash',
        contents=[
          types.Part.from_text(text='What is shown in this image?'),
          types.Part.from_uri(file_uri='gs://generativeai-downloads/images/scones.jpg',
          mime_type='image/jpeg')
        ]
      )
      print(response.text)
      # The image shows a flat lay arrangement of freshly baked blueberry
      # scones.
    """

    incompatible_tools_indexes = (
        _extra_utils.find_afc_incompatible_tool_indexes(config)
    )
    parsed_config = _extra_utils.parse_config_for_mcp_usage(config)
    if (
        parsed_config
        and parsed_config.tools
        and _mcp_utils.has_mcp_session_usage(parsed_config.tools)
    ):
      raise errors.UnsupportedFunctionError(
          'MCP sessions are not supported in synchronous methods.'
      )
    if _extra_utils.should_disable_afc(parsed_config):
      return self._generate_content(
          model=model, contents=contents, config=parsed_config
      )
    if incompatible_tools_indexes:
      original_tools_length = 0
      if isinstance(config, types.GenerateContentConfig):
        if config.tools:
          original_tools_length = len(config.tools)
      elif isinstance(config, dict):
        tools = config.get('tools', [])
        if tools:
          original_tools_length = len(tools)
      if len(incompatible_tools_indexes) != original_tools_length:
        indices_str = ', '.join(map(str, incompatible_tools_indexes))
        logger.warning(
            'Tools at indices [%s] are not compatible with automatic function '
            'calling (AFC). AFC is disabled. If AFC is intended, please '
            'include python callables in the tool list, and do not include '
            'function declaration and MCP server in the tool list.',
            indices_str,
        )
      return self._generate_content(
          model=model, contents=contents, config=parsed_config
      )

    remaining_remote_calls_afc = _extra_utils.get_max_remote_calls_afc(
        parsed_config
    )
    logger.info(
        f'AFC is enabled with max remote calls: {remaining_remote_calls_afc}.'
    )
    automatic_function_calling_history: list[types.Content] = []
    response = types.GenerateContentResponse()
    i = 0
    while remaining_remote_calls_afc > 0:
      i += 1
      response = self._generate_content(
          model=model, contents=contents, config=parsed_config
      )

      function_map = _extra_utils.get_function_map(parsed_config)
      if not function_map:
        break
      if not response:
        break
      if (
          not response.candidates
          or not response.candidates[0].content
          or not response.candidates[0].content.parts
      ):
        break
      func_response_parts = _extra_utils.get_function_response_parts(
          response, function_map
      )
      if not func_response_parts:
        break
      logger.info(f'AFC remote call {i} is done.')
      remaining_remote_calls_afc -= 1
      if remaining_remote_calls_afc == 0:
        logger.info('Reached max remote calls for automatic function calling.')

      func_call_content = response.candidates[0].content
      func_response_content = types.Content(
          role='user',
          parts=func_response_parts,
      )
      contents = t.t_contents(contents)  # type: ignore[assignment]
      if not automatic_function_calling_history:
        automatic_function_calling_history.extend(contents)  # type: ignore[arg-type]
      if isinstance(contents, list):
        contents.append(func_call_content)  # type: ignore[arg-type]
        contents.append(func_response_content)  # type: ignore[arg-type]
      automatic_function_calling_history.append(func_call_content)
      automatic_function_calling_history.append(func_response_content)
    if (
        _extra_utils.should_append_afc_history(parsed_config)
        and response is not None
    ):
      response.automatic_function_calling_history = (
          automatic_function_calling_history
      )
    return response

  def generate_content_stream(
      self,
      *,
      model: str,
      contents: types.ContentListUnionDict,
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> Iterator[types.GenerateContentResponse]:
    """Makes an API request to generate content using a model and yields the model's response in chunks.

    For the `model` parameter, supported formats for Gemini Enterprise Agent
    Platform API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The full resource name starts with 'projects/', for example:
      'projects/my-project-id/locations/us-central1/publishers/google/models/gemini-2.0-flash'
    - The partial resource name with 'publishers/', for example:
      'publishers/google/models/gemini-2.0-flash' or
    - `/` separated publisher and model name, for example:
      'google/gemini-2.0-flash'

    For the `model` parameter, supported formats for Gemini API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The model name starts with 'models/', for example:
      'models/gemini-2.0-flash'
    - For tuned models, the model name starts with 'tunedModels/',
      for example:
      'tunedModels/1234567890123456789'

    Some models support multimodal input and output.

    Built-in MCP support is an experimental feature.

    Usage:

    .. code-block:: python

      from google.genai import types
      from google import genai

      client = genai.Client(
          vertexai=True, project='my-project-id', location='us-central1'
      )

      for chunk in client.models.generate_content_stream(
        model='gemini-2.0-flash',
        contents='''What is a good name for a flower shop that specializes in
          selling bouquets of dried flowers?'''
      ):
        print(chunk.text)
      # **Elegant & Classic:**
      # * The Dried Bloom
      # * Everlasting Florals
      # * Timeless Petals

      for chunk in client.models.generate_content_stream(
        model='gemini-2.0-flash',
        contents=[
          types.Part.from_text('What is shown in this image?'),
          types.Part.from_uri('gs://generativeai-downloads/images/scones.jpg',
          'image/jpeg')
        ]
      ):
        print(chunk.text)
      # The image shows a flat lay arrangement of freshly baked blueberry
      # scones.
    """

    incompatible_tools_indexes = (
        _extra_utils.find_afc_incompatible_tool_indexes(config)
    )
    parsed_config = _extra_utils.parse_config_for_mcp_usage(config)
    if (
        parsed_config
        and parsed_config.tools
        and _mcp_utils.has_mcp_session_usage(parsed_config.tools)
    ):
      raise errors.UnsupportedFunctionError(
          'MCP sessions are not supported in synchronous methods.'
      )
    if _extra_utils.should_disable_afc(parsed_config):
      yield from self._generate_content_stream(
          model=model, contents=contents, config=parsed_config
      )
      return

    if incompatible_tools_indexes:
      original_tools_length = 0
      if isinstance(config, types.GenerateContentConfig):
        if config.tools:
          original_tools_length = len(config.tools)
      elif isinstance(config, dict):
        tools = config.get('tools', [])
        if tools:
          original_tools_length = len(tools)
      if len(incompatible_tools_indexes) != original_tools_length:
        indices_str = ', '.join(map(str, incompatible_tools_indexes))
        logger.warning(
            'Tools at indices [%s] are not compatible with automatic function '
            'calling. AFC will be disabled.',
            indices_str,
        )
      yield from self._generate_content_stream(
          model=model, contents=contents, config=parsed_config
      )
      return

    # With tool compatibility confirmed, validate that the configuration are
    # compatible with each other and raise an error if invalid.
    _extra_utils.raise_error_for_afc_incompatible_config(parsed_config)

    remaining_remote_calls_afc = _extra_utils.get_max_remote_calls_afc(
        parsed_config
    )
    logger.info(
        f'AFC is enabled with max remote calls: {remaining_remote_calls_afc}.'
    )
    automatic_function_calling_history: list[types.Content] = []
    chunk = None
    func_response_parts = None
    i = 0
    while remaining_remote_calls_afc > 0:
      i += 1
      response = self._generate_content_stream(
          model=model, contents=contents, config=parsed_config
      )

      function_map = _extra_utils.get_function_map(parsed_config)

      if i == 1:
        # First request gets a function call.
        # Then get function response parts.
        # Yield chunks only if there's no function response parts.
        for chunk in response:
          if not function_map:
            contents = _extra_utils.append_chunk_contents(contents, chunk)  # type: ignore[assignment]
            yield chunk
          else:
            if (
                not chunk.candidates
                or not chunk.candidates[0].content
                or not chunk.candidates[0].content.parts
            ):
              break
            func_response_parts = _extra_utils.get_function_response_parts(
                chunk, function_map
            )
            if not func_response_parts:
              contents = _extra_utils.append_chunk_contents(contents, chunk)  # type: ignore[assignment]
              yield chunk

      else:
        #  Second request and beyond, yield chunks.
        for chunk in response:
          if _extra_utils.should_append_afc_history(parsed_config):
            chunk.automatic_function_calling_history = (
                automatic_function_calling_history
            )
          contents = _extra_utils.append_chunk_contents(contents, chunk)  # type: ignore[assignment]
          yield chunk
        if (
            chunk is None
            or not chunk.candidates
            or not chunk.candidates[0].content
            or not chunk.candidates[0].content.parts
        ):
          break
        func_response_parts = _extra_utils.get_function_response_parts(
            chunk, function_map
        )

      if not function_map:
        break
      if not func_response_parts:
        break
      logger.info(f'AFC remote call {i} is done.')
      remaining_remote_calls_afc -= 1
      if remaining_remote_calls_afc == 0:
        logger.info('Reached max remote calls for automatic function calling.')

      # Append function response parts to contents for the next request.
      if chunk is not None and chunk.candidates is not None:
        func_call_content = chunk.candidates[0].content
        func_response_content = types.Content(
            role='user',
            parts=func_response_parts,
        )
        contents = t.t_contents(contents)  # type: ignore[assignment]
        if not automatic_function_calling_history:
          automatic_function_calling_history.extend(contents)  # type: ignore[arg-type]
        if isinstance(contents, list) and func_call_content is not None:
          contents.append(func_call_content)  # type: ignore[arg-type]
          contents.append(func_response_content)  # type: ignore[arg-type]
        if func_call_content is not None:
          automatic_function_calling_history.append(func_call_content)
        automatic_function_calling_history.append(func_response_content)

  def generate_images(
      self,
      *,
      model: str,
      prompt: str,
      config: Optional[types.GenerateImagesConfigOrDict] = None,
  ) -> types.GenerateImagesResponse:
    """Generates images based on a text description and configuration.

    Args:
      model (str): The model to use.
      prompt (str): A text description of the images to generate.
      config (GenerateImagesConfig): Configuration for generation.

    Usage:

    .. code-block:: python

      response = client.models.generate_images(
        model='imagen-3.0-generate-002',
        prompt='Man with a dog',
        config=types.GenerateImagesConfig(
            number_of_images= 1,
            include_rai_reason= True,
        )
      )
      response.generated_images[0].image.show()
      # Shows a man with a dog.
    """
    api_response = self._generate_images(
        model=model,
        prompt=prompt,
        config=config,
    )
    positive_prompt_safety_attributes = None
    generated_images = []
    if not api_response or not api_response.generated_images:
      return api_response

    for generated_image in api_response.generated_images:
      if (
          generated_image.safety_attributes
          and generated_image.safety_attributes.content_type
          == 'Positive Prompt'
      ):
        positive_prompt_safety_attributes = generated_image.safety_attributes
      else:
        generated_images.append(generated_image)

    response = types.GenerateImagesResponse(
        generated_images=generated_images,
        positive_prompt_safety_attributes=positive_prompt_safety_attributes,
    )
    return response

  def edit_image(
      self,
      *,
      model: str,
      prompt: str,
      reference_images: list[types._ReferenceImageAPIOrDict],
      config: Optional[types.EditImageConfigOrDict] = None,
  ) -> types.EditImageResponse:
    """Edits an image based on a text description and configuration.

    Args:
      model (str): The model to use.
      prompt (str): A text description of the edit to apply to the image.
        reference_images (list[Union[RawReferenceImage, MaskReferenceImage,
        ControlReferenceImage, StyleReferenceImage, SubjectReferenceImage]): The
        reference images for editing.
      config (EditImageConfig): Configuration for editing.

    Usage:

    .. code-block:: python

      from google.genai.types import RawReferenceImage, MaskReferenceImage

      raw_ref_image = RawReferenceImage(
        reference_id=1,
        reference_image=types.Image.from_file(location=IMAGE_FILE_PATH),
      )

      mask_ref_image = MaskReferenceImage(
        reference_id=2,
        config=types.MaskReferenceConfig(
            mask_mode='MASK_MODE_FOREGROUND',
            mask_dilation=0.06,
        ),
      )
      response = client.models.edit_image(
        model='imagen-3.0-capability-001',
        prompt='Man with dog',
        reference_images=[raw_ref_image, mask_ref_image],
        config=types.EditImageConfig(
            edit_mode= "EDIT_MODE_INPAINT_INSERTION",
            number_of_images= 1,
            include_rai_reason= True,
        )
      )
      response.generated_images[0].image.show()
      # Shows a man with a dog instead of a cat.
    """
    return self._edit_image(
        model=model,
        prompt=prompt,
        reference_images=reference_images,
        config=config,
    )

  def upscale_image(
      self,
      *,
      model: str,
      image: types.ImageOrDict,
      upscale_factor: str,
      config: Optional[types.UpscaleImageConfigOrDict] = None,
  ) -> types.UpscaleImageResponse:
    """Makes an API request to upscale a provided image.

    Args:
      model (str): The model to use.
      image (Image): The input image for upscaling.
      upscale_factor (str): The factor to upscale the image (x2 or x4).
      config (UpscaleImageConfig): Configuration for upscaling.

    Usage:

    .. code-block:: python

      from google.genai.types import Image

      IMAGE_FILE_PATH="my-image.png"
      response=client.models.upscale_image(
          model='imagen-3.0-generate-001',
          image=types.Image.from_file(location=IMAGE_FILE_PATH),
          upscale_factor='x2',
      )
      response.generated_images[0].image.show()
      # Opens my-image.png which is upscaled by a factor of 2.
    """

    # Validate config.
    types.UpscaleImageParameters(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=config,
    )

    # Convert to API config.
    config = config or {}
    if isinstance(config, types.UpscaleImageConfig):
      config_dct = config.model_dump()
    else:
      config_dct = dict(config)
    api_config = types._UpscaleImageAPIConfigDict(
        http_options=config_dct.get('http_options', None),
        output_gcs_uri=config_dct.get('output_gcs_uri', None),
        safety_filter_level=config_dct.get('safety_filter_level', None),
        person_generation=config_dct.get('person_generation', None),
        include_rai_reason=config_dct.get('include_rai_reason', None),
        output_mime_type=config_dct.get('output_mime_type', None),
        output_compression_quality=config_dct.get(
            'output_compression_quality', None
        ),
        enhance_input_image=config_dct.get('enhance_input_image', None),
        image_preservation_factor=config_dct.get(
            'image_preservation_factor', None
        ),
        labels=config_dct.get('labels', None),
    )  # pylint: disable=protected-access

    # Provide default values through API config.
    api_config['mode'] = 'upscale'
    api_config['number_of_images'] = 1

    return self._upscale_image(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=api_config,
    )

  def generate_videos(
      self,
      *,
      model: str,
      prompt: Optional[str] = None,
      image: Optional[types.ImageOrDict] = None,
      video: Optional[types.VideoOrDict] = None,
      source: Optional[types.GenerateVideosSourceOrDict] = None,
      config: Optional[types.GenerateVideosConfigOrDict] = None,
  ) -> types.GenerateVideosOperation:
    """Generates videos based on an input (text, image, or video) and configuration.

    The following use cases are supported:
    1. Text to video generation.
    2a. Image to video generation (additional text prompt is optional).
    2b. Image to video generation with frame interpolation (specify last_frame
    in config).
    3. Video extension (additional text prompt is optional)

    Args:
      model: The model to use.
      prompt: The text prompt for generating the videos. Optional for image to
        video and video extension use cases. This argument is deprecated, please
        use source instead.
      image: The input image for generating the videos. Optional if prompt is
        provided. This argument is deprecated, please use source instead.
      video: The input video for video extension use cases. Optional if prompt
        or image is provided. This argument is deprecated, please use source
        instead.
      source: The input source for generating the videos (prompt, image, and/or
        video)
      config: Configuration for generation.

    Usage:

      ```
      operation = client.models.generate_videos(
          model="veo-2.0-generate-001",
          source=types.GenerateVideosSource(
              prompt="A neon hologram of a cat driving at top speed",
          ),
      )
      while not operation.done:
          time.sleep(10)
          operation = client.operations.get(operation)

      operation.result.generated_videos[0].video.uri
      ```
    """
    if (prompt or image or video) and source:
      raise ValueError(
          'Source and prompt/image/video are mutually exclusive.'
          + ' Please only use source.'
      )
    # Gemini Developer API does not support video bytes.
    video_dct: dict[str, Any] = {}
    if not self._api_client.vertexai and video:
      if isinstance(video, types.Video):
        video_dct = video.model_dump()
      else:
        video_dct = dict(video)

      if video_dct.get('uri') and video_dct.get('video_bytes'):
        video = types.Video(
            uri=video_dct.get('uri'), mime_type=video_dct.get('mime_type')
        )
    elif not self._api_client.vertexai and source:
      if isinstance(source, types.GenerateVideosSource):
        source_dct = source.model_dump()
        video_dct = source_dct.get('video', {})
      else:
        source_dct = dict(source)
        if isinstance(source_dct.get('video'), types.Video):
          video_obj: types.Video = source_dct.get('video', types.Video())
          video_dct = video_obj.model_dump()
      if video_dct and video_dct.get('uri') and video_dct.get('video_bytes'):
        source = types.GenerateVideosSource(
            prompt=source_dct.get('prompt'),
            image=source_dct.get('image'),
            video=types.Video(
                uri=video_dct.get('uri'),
                mime_type=video_dct.get('mime_type'),
            ),
        )
    return self._generate_videos(
        model=model,
        prompt=prompt,
        image=image,
        video=video,
        source=source,
        config=config,
    )

  def list(
      self,
      *,
      config: Optional[types.ListModelsConfigOrDict] = None,
  ) -> Pager[types.Model]:
    """Makes an API request to list the available models.

    If `query_base` is set to True in the config or not set (default), the
    API will return all available base models. If set to False, it will return
    all tuned models.

    Args:
      config (ListModelsConfigOrDict): Configuration for retrieving models.

    Usage:

    .. code-block:: python

      response=client.models.list(config={'page_size': 5})
      print(response.page)
      # [Model(name='projects/./locations/./models/123', display_name='my_model'

      response=client.models.list(config={'page_size': 5, 'query_base': True})
      print(response.page)
      # [Model(name='publishers/google/models/gemini-2.0-flash-exp' ...
    """

    config = (
        types._ListModelsParameters(config=config).config
        or types.ListModelsConfig()
    )
    if config.query_base is None:
      config.query_base = True
    if self._api_client.vertexai:
      config = config.copy()
      if not config.query_base:
        # Filter for tuning jobs artifacts by labels.
        filter_value = config.filter
        config.filter = (
            filter_value + '&filter=labels.tune-type:*'
            if filter_value
            else 'labels.tune-type:*'
        )
    return Pager(
        'models',
        self._list,
        self._list(config=config),
        config,
    )


class AsyncModels(_api_module.BaseModule):

  async def _generate_content(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> types.GenerateContentResponse:
    parameter_model = types._GenerateContentParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateContentParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:generateContent'.format_map(request_url_dict)
      else:
        path = '{model}:generateContent'
    else:
      request_dict = _GenerateContentParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:generateContent'.format_map(request_url_dict)
      else:
        path = '{model}:generateContent'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      return_value = types.GenerateContentResponse(sdk_http_response=response)
      self._api_client._verify_response(return_value)
      return return_value

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateContentResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateContentResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateContentResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _generate_content_stream(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> Awaitable[AsyncIterator[types.GenerateContentResponse]]:
    parameter_model = types._GenerateContentParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateContentParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:streamGenerateContent?alt=sse'.format_map(
            request_url_dict
        )
      else:
        path = '{model}:streamGenerateContent?alt=sse'
    else:
      request_dict = _GenerateContentParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:streamGenerateContent?alt=sse'.format_map(
            request_url_dict
        )
      else:
        path = '{model}:streamGenerateContent?alt=sse'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    if config is not None and getattr(
        config, 'should_return_http_response', None
    ):
      raise ValueError(
          'Accessing the raw HTTP response is not supported in streaming'
          ' methods.'
      )

    response_stream = await self._api_client.async_request_streamed(
        'post', path, request_dict, http_options
    )

    async def async_generator():  # type: ignore[no-untyped-def]
      async for response in response_stream:

        response_dict = {} if not response.body else json.loads(response.body)

        if self._api_client.vertexai:
          response_dict = _GenerateContentResponse_from_vertex(
              response_dict, None, parameter_model
          )

        if not self._api_client.vertexai:
          response_dict = _GenerateContentResponse_from_mldev(
              response_dict, None, parameter_model
          )

        return_value = types.GenerateContentResponse._from_response(
            response=response_dict,
            kwargs={
                'config': {
                    'response_schema': getattr(
                        parameter_model.config, 'response_schema', None
                    ),
                    'response_json_schema': getattr(
                        parameter_model.config, 'response_json_schema', None
                    ),
                    'include_all_fields': getattr(
                        parameter_model.config, 'include_all_fields', None
                    ),
                }
            }
            if getattr(parameter_model, 'config', None)
            else {},
        )
        return_value.sdk_http_response = types.HttpResponse(
            headers=response.headers
        )
        self._api_client._verify_response(return_value)
        yield return_value

    return async_generator()  # type: ignore[no-untyped-call, no-any-return]

  async def _embed_content(
      self,
      *,
      model: str,
      contents: Optional[
          Union[types.ContentListUnion, types.ContentListUnionDict]
      ] = None,
      content: Optional[
          Union[types.ContentUnion, types.ContentUnionDict]
      ] = None,
      embedding_api_type: Optional[types.EmbeddingApiType] = None,
      config: Optional[types.EmbedContentConfigOrDict] = None,
  ) -> types.EmbedContentResponse:
    """Calculates embeddings for the given contents. Only text is supported.

    Args:
      model (str): The model to use.
      contents (list[Content]): The contents to embed.
      config (EmbedContentConfig): Optional configuration for embeddings.

    Usage:

    .. code-block:: python

      embeddings = await client.aio.models.embed_content(
          model= 'text-embedding-004',
          contents=[
              'What is your name?',
              'What is your favorite color?',
          ],
          config={
              'output_dimensionality': 64
          },
      )
    """

    parameter_model = types._EmbedContentParametersPrivate(
        model=model,
        contents=contents,
        content=content,
        embedding_api_type=embedding_api_type,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _EmbedContentParametersPrivate_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      endpoint_url = '{model}:embedContent' if t.t_is_vertex_embed_content_model(parameter_model.model) else '{model}:predict'  # type: ignore[arg-type]
      if request_url_dict:
        path = endpoint_url.format_map(request_url_dict)
      else:
        path = endpoint_url
    else:
      request_dict = _EmbedContentParametersPrivate_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:batchEmbedContents'.format_map(request_url_dict)
      else:
        path = '{model}:batchEmbedContents'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _EmbedContentResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _EmbedContentResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.EmbedContentResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _generate_images(
      self,
      *,
      model: str,
      prompt: str,
      config: Optional[types.GenerateImagesConfigOrDict] = None,
  ) -> types.GenerateImagesResponse:
    """Private method for generating images asynchronously."""

    parameter_model = types._GenerateImagesParameters(
        model=model,
        prompt=prompt,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateImagesParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'
    else:
      request_dict = _GenerateImagesParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateImagesResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateImagesResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateImagesResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _edit_image(
      self,
      *,
      model: str,
      prompt: str,
      reference_images: list[types._ReferenceImageAPIOrDict],
      config: Optional[types.EditImageConfigOrDict] = None,
  ) -> types.EditImageResponse:
    """Private method for editing an image asynchronously."""

    parameter_model = types._EditImageParameters(
        model=model,
        prompt=prompt,
        reference_images=reference_images,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _EditImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _EditImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.EditImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _upscale_image(
      self,
      *,
      model: str,
      image: types.ImageOrDict,
      upscale_factor: str,
      config: Optional[types._UpscaleImageAPIConfigOrDict] = None,
  ) -> types.UpscaleImageResponse:
    """Private method for upscaling an image asynchronously."""

    parameter_model = types._UpscaleImageAPIParameters(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _UpscaleImageAPIParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _UpscaleImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.UpscaleImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def recontext_image(
      self,
      *,
      model: str,
      source: types.RecontextImageSourceOrDict,
      config: Optional[types.RecontextImageConfigOrDict] = None,
  ) -> types.RecontextImageResponse:
    """Recontextualizes an image.

    There is one type of recontextualization currently supported:
    1) Virtual Try-On: Generate images of persons modeling fashion products.

    Args:
      model (str): The model to use.
      source (RecontextImageSource): An object containing the source inputs
        (prompt, person_image, product_images) for image recontext. prompt is
        behind an allowlist. person_image is required. product_images is
        required. Only one product image is supported currently.
      config (RecontextImageConfig): Configuration for recontextualization.

    Usage:

      ```
      virtual_try_on_response = await client.aio.models.recontext_image(
          model="virtual-try-on-001",
          source=types.RecontextImageSource(
              person_image=types.Image.from_file(location=IMAGE1_FILE_PATH),
              product_images=[types.ProductImage(product_image=
                  types.Image.from_file(location=IMAGE2_FILE_PATH)
              )],
          ),
          config=types.RecontextImageConfig(
              number_of_images=1,
          ),
      )
      image = virtual_try_on_response.generated_images[0].image
      ```
    """

    parameter_model = types._RecontextImageParameters(
        model=model,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _RecontextImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _RecontextImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.RecontextImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def segment_image(
      self,
      *,
      model: str,
      source: types.SegmentImageSourceOrDict,
      config: Optional[types.SegmentImageConfigOrDict] = None,
  ) -> types.SegmentImageResponse:
    """Segments an image, creating a mask of a specified area.

    Args:
      model (str): The model to use.
      source (SegmentImageSource): An object containing the source inputs
        (prompt, image, scribble_image) for image segmentation. The prompt is
        required for prompt mode and semantic mode, disallowed for other modes.
        scribble_image is required for the interactive mode, disallowed for
        other modes.
      config (SegmentImageConfig): Configuration for segmentation.

    Usage:

      ```
      response = await client.aio.models.segment_image(
          model="image-segmentation-001",
          source=types.SegmentImageSource(
              image=types.Image.from_file(location=IMAGE_FILE_PATH),
          ),
          config=types.SegmentImageConfig(
              mode=types.SegmentMode.foreground,
          ),
      )

      mask_image = response.generated_masks[0].mask
      ```
    """

    parameter_model = types._SegmentImageParameters(
        model=model,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _SegmentImageParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predict'.format_map(request_url_dict)
      else:
        path = '{model}:predict'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _SegmentImageResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.SegmentImageResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def get(
      self, *, model: str, config: Optional[types.GetModelConfigOrDict] = None
  ) -> types.Model:
    parameter_model = types._GetModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GetModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    else:
      request_dict = _GetModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _Model_from_vertex(response_dict, None, parameter_model)

    if not self._api_client.vertexai:
      response_dict = _Model_from_mldev(response_dict, None, parameter_model)

    return_value = types.Model._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def _list(
      self, *, config: Optional[types.ListModelsConfigOrDict] = None
  ) -> types.ListModelsResponse:
    parameter_model = types._ListModelsParameters(
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _ListModelsParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{models_url}'.format_map(request_url_dict)
      else:
        path = '{models_url}'
    else:
      request_dict = _ListModelsParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{models_url}'.format_map(request_url_dict)
      else:
        path = '{models_url}'
    query_params = request_dict.get('_query')
    if query_params and query_params.get('filter'):
      query_param_filter = query_params.pop('filter')
      path = f'{path}?filter={query_param_filter}'
      if query_params:
        path += f'&{urlencode(query_params)}'
    elif query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'get', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ListModelsResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _ListModelsResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.ListModelsResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def update(
      self,
      *,
      model: str,
      config: Optional[types.UpdateModelConfigOrDict] = None,
  ) -> types.Model:
    parameter_model = types._UpdateModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _UpdateModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}'.format_map(request_url_dict)
      else:
        path = '{model}'
    else:
      request_dict = _UpdateModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'patch', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _Model_from_vertex(response_dict, None, parameter_model)

    if not self._api_client.vertexai:
      response_dict = _Model_from_mldev(response_dict, None, parameter_model)

    return_value = types.Model._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def delete(
      self,
      *,
      model: str,
      config: Optional[types.DeleteModelConfigOrDict] = None,
  ) -> types.DeleteModelResponse:
    parameter_model = types._DeleteModelParameters(
        model=model,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _DeleteModelParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    else:
      request_dict = _DeleteModelParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{name}'.format_map(request_url_dict)
      else:
        path = '{name}'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'delete', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _DeleteModelResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _DeleteModelResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.DeleteModelResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def count_tokens(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.CountTokensConfigOrDict] = None,
  ) -> types.CountTokensResponse:
    """Counts the number of tokens in the given content.

    Multimodal input is supported for Gemini models.

    Args:
      model (str): The model to use for counting tokens.
      contents (list[types.Content]): The content to count tokens for.
      config (CountTokensConfig): The configuration for counting tokens.

    Usage:

    .. code-block:: python

      response = await client.aio.models.count_tokens(
          model='gemini-2.0-flash',
          contents='What is your name?',
      )
      print(response)
      # total_tokens=5 cached_content_token_count=None
    """

    parameter_model = types._CountTokensParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _CountTokensParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:countTokens'.format_map(request_url_dict)
      else:
        path = '{model}:countTokens'
    else:
      request_dict = _CountTokensParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:countTokens'.format_map(request_url_dict)
      else:
        path = '{model}:countTokens'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _CountTokensResponse_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _CountTokensResponse_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.CountTokensResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def compute_tokens(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.ComputeTokensConfigOrDict] = None,
  ) -> types.ComputeTokensResponse:
    """Given a list of contents, returns a corresponding TokensInfo containing the

    list of tokens and list of token ids.


    Args:
      model (str): The model to use.
      contents (list[shared.Content]): The content to compute tokens for.

    Usage:

    .. code-block:: python

      response = await client.aio.models.compute_tokens(
          model='gemini-2.0-flash',
          contents='What is your name?',
      )
      print(response)
      # tokens_info=[TokensInfo(role='user', token_ids=['1841', ...],
      # tokens=[b'What', b' is', b' your', b' name', b'?'])]
    """

    parameter_model = types._ComputeTokensParameters(
        model=model,
        contents=contents,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]
    if not self._api_client.vertexai:
      raise ValueError(
          'This method is only supported in Gemini Enterprise Agent Platform'
          ' mode, not in Gemini Developer API mode.'
      )
    else:
      request_dict = _ComputeTokensParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:computeTokens'.format_map(request_url_dict)
      else:
        path = '{model}:computeTokens'

    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _ComputeTokensResponse_from_vertex(
          response_dict, None, parameter_model
      )

    return_value = types.ComputeTokensResponse._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )
    return_value.sdk_http_response = types.HttpResponse(
        headers=response.headers
    )
    self._api_client._verify_response(return_value)
    return return_value

  async def _generate_videos(
      self,
      *,
      model: str,
      prompt: Optional[str] = None,
      image: Optional[types.ImageOrDict] = None,
      video: Optional[types.VideoOrDict] = None,
      source: Optional[types.GenerateVideosSourceOrDict] = None,
      config: Optional[types.GenerateVideosConfigOrDict] = None,
  ) -> types.GenerateVideosOperation:
    """Private method for generating videos asynchronously."""

    parameter_model = types._GenerateVideosParameters(
        model=model,
        prompt=prompt,
        image=image,
        video=video,
        source=source,
        config=config,
    )

    request_url_dict: Optional[dict[str, str]]

    if self._api_client.vertexai:
      request_dict = _GenerateVideosParameters_to_vertex(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predictLongRunning'.format_map(request_url_dict)
      else:
        path = '{model}:predictLongRunning'
    else:
      request_dict = _GenerateVideosParameters_to_mldev(
          self._api_client, parameter_model, None, parameter_model
      )
      request_url_dict = request_dict.get('_url')
      if request_url_dict:
        path = '{model}:predictLongRunning'.format_map(request_url_dict)
      else:
        path = '{model}:predictLongRunning'
    query_params = request_dict.get('_query')
    if query_params:
      path = f'{path}?{urlencode(query_params)}'
    # TODO: remove the hack that pops config.
    request_dict.pop('config', None)

    http_options: Optional[types.HttpOptions] = None
    if (
        parameter_model.config is not None
        and parameter_model.config.http_options is not None
    ):
      http_options = parameter_model.config.http_options

    request_dict = _common.convert_to_dict(request_dict)
    request_dict = _common.encode_unserializable_types(request_dict)

    response = await self._api_client.async_request(
        'post', path, request_dict, http_options
    )

    response_dict = {} if not response.body else json.loads(response.body)

    if self._api_client.vertexai:
      response_dict = _GenerateVideosOperation_from_vertex(
          response_dict, None, parameter_model
      )

    if not self._api_client.vertexai:
      response_dict = _GenerateVideosOperation_from_mldev(
          response_dict, None, parameter_model
      )

    return_value = types.GenerateVideosOperation._from_response(
        response=response_dict,
        kwargs={
            'config': {
                'response_schema': getattr(
                    parameter_model.config, 'response_schema', None
                ),
                'response_json_schema': getattr(
                    parameter_model.config, 'response_json_schema', None
                ),
                'include_all_fields': getattr(
                    parameter_model.config, 'include_all_fields', None
                ),
            }
        }
        if getattr(parameter_model, 'config', None)
        else {},
    )

    self._api_client._verify_response(return_value)
    return return_value

  async def generate_content(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> types.GenerateContentResponse:
    """Makes an API request to generate content using a model.

    Some models support multimodal input and output.

    Built-in MCP support is an experimental feature.

    Usage:

    .. code-block:: python

      from google.genai import types
      from google import genai

      client = genai.Client(
          vertexai=True, project='my-project-id', location='us-central1'
      )

      response = await client.aio.models.generate_content(
          model='gemini-2.0-flash',
          contents='User input: I like bagels. Answer:',
          config=types.GenerateContentConfig(
              system_instruction=
                [
                  'You are a helpful language translator.',
                  'Your mission is to translate text in English to French.'
                ]
          ),
      )
      print(response.text)
      # J'aime les bagels.
    """
    # Retrieve and cache any MCP sessions if provided.
    incompatible_tools_indexes = (
        _extra_utils.find_afc_incompatible_tool_indexes(
            config,
            is_agent_platform=getattr(self._api_client, 'vertexai', False),
        )
    )

    if not config:
      parsed_config = None
    elif isinstance(config, dict):
      parsed_config = types.GenerateContentConfig(**config)
    else:
      parsed_config = config.model_copy(deep=True)

    # Use AsyncExitStack to keep MCP connections alive across the entire AFC loop
    async with contextlib.AsyncExitStack() as stack:

      # Intercept Agent Platform MCP servers and open connections
      if (
          self._api_client.vertexai
          and _extra_utils.has_agent_platform_mcp_servers(parsed_config)
          and parsed_config is not None
      ):
        new_tools: list[Any] = []
        if parsed_config.tools:
          for tool in parsed_config.tools:
            if isinstance(tool, types.Tool) and tool.mcp_servers:
              # Only keep the tool if it has fields besides mcp_servers
              if (
                  tool.function_declarations
                  or tool.google_search
                  or tool.retrieval
                  or tool.google_search_retrieval
                  or tool.code_execution
              ):
                tool_copy = tool.model_copy(update={'mcp_servers': None})
                new_tools.append(tool_copy)

              for server in tool.mcp_servers:

                if (
                    getattr(server, 'streamable_http_transport', None)
                    is not None
                ):
                  raise ValueError(
                      "The 'streamable_http_transport' parameter is only"
                      ' supported in Gemini Developer API mode, not in Gemini'
                      ' Enterprise Agent Platform mode.'
                  )

                # Open the stream and tie its lifespan to the AsyncExitStack
                if server.name is not None:
                  session = await stack.enter_async_context(
                      _mcp_utils._connect_agent_platform_mcp(
                          self._api_client, server.name
                      )
                  )
                  new_tools.append(session)
                else:
                  raise ValueError(
                      "Agent Platform MCP servers require a 'name' field."
                  )
            else:
              new_tools.append(tool)
          parsed_config.tools = new_tools

      # Convert active sessions to tools and adapters
      final_parsed_config, mcp_to_genai_tool_adapters = (
          await _extra_utils.parse_config_for_mcp_sessions(
              parsed_config,
              is_agent_platform=getattr(self._api_client, 'vertexai', False),
          )
      )

      if _extra_utils.should_disable_afc(final_parsed_config):
        return await self._generate_content(
            model=model, contents=contents, config=final_parsed_config
        )

      if incompatible_tools_indexes:
        original_tools_length = 0
        if isinstance(config, types.GenerateContentConfig):
          if config.tools:
            original_tools_length = len(config.tools)
        elif isinstance(config, dict):
          tools = config.get('tools', [])
          if tools:
            original_tools_length = len(tools)
        if len(incompatible_tools_indexes) != original_tools_length:
          indices_str = ', '.join(map(str, incompatible_tools_indexes))
          logger.warning(
              'Tools at indices [%s] are not compatible with automatic function'
              ' calling (AFC). AFC is disabled. If AFC is intended, please'
              ' include python callables in the tool list, and do not include'
              ' function declaration and MCP server in the tool list.',
              indices_str,
          )
        return await self._generate_content(
            model=model, contents=contents, config=final_parsed_config
        )

      remaining_remote_calls_afc = _extra_utils.get_max_remote_calls_afc(
          final_parsed_config
      )
      logger.info(
          f'AFC is enabled with max remote calls: {remaining_remote_calls_afc}.'
      )
      automatic_function_calling_history: list[types.Content] = []
      response = types.GenerateContentResponse()

      while remaining_remote_calls_afc > 0:
        response = await self._generate_content(
            model=model, contents=contents, config=final_parsed_config
        )
        remaining_remote_calls_afc -= 1
        if remaining_remote_calls_afc == 0:
          logger.info(
              'Reached max remote calls for automatic function calling.'
          )

        function_map = _extra_utils.get_function_map(
            final_parsed_config,
            mcp_to_genai_tool_adapters,
            is_caller_method_async=True,
        )
        if not function_map:
          break
        if not response:
          break
        if (
            not response.candidates
            or not response.candidates[0].content
            or not response.candidates[0].content.parts
        ):
          break
        func_response_parts = (
            await _extra_utils.get_function_response_parts_async(
                response, function_map
            )
        )
        if not func_response_parts:
          break
        func_call_content = response.candidates[0].content
        func_response_content = types.Content(
            role='user',
            parts=func_response_parts,
        )
        contents = t.t_contents(contents)  # type: ignore[assignment]
        if not automatic_function_calling_history:
          automatic_function_calling_history.extend(contents)  # type: ignore[arg-type]
        if isinstance(contents, list):
          contents.append(func_call_content)  # type: ignore[arg-type]
          contents.append(func_response_content)  # type: ignore[arg-type]
        automatic_function_calling_history.append(func_call_content)
        automatic_function_calling_history.append(func_response_content)

      if (
          _extra_utils.should_append_afc_history(final_parsed_config)
          and response is not None
      ):
        response.automatic_function_calling_history = (
            automatic_function_calling_history
        )

      return response

  async def generate_content_stream(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.GenerateContentConfigOrDict] = None,
  ) -> AsyncIterator[types.GenerateContentResponse]:
    """Makes an API request to generate content using a model and yields the model's response in chunks.

    For the `model` parameter, supported formats for Gemini Enterprise Agent
    Platform API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The full resource name starts with 'projects/', for example:
      'projects/my-project-id/locations/us-central1/publishers/google/models/gemini-2.0-flash'
    - The partial resource name with 'publishers/', for example:
      'publishers/google/models/gemini-2.0-flash' or
    - `/` separated publisher and model name, for example:
      'google/gemini-2.0-flash'

    For the `model` parameter, supported formats for Gemini API include:
    - The Gemini model ID, for example: 'gemini-2.0-flash'
    - The model name starts with 'models/', for example:
      'models/gemini-2.0-flash'
    - For tuned models, the model name starts with 'tunedModels/',
      for example:
      'tunedModels/1234567890123456789'

    Some models support multimodal input and output.

    Built-in MCP support is an experimental feature.

    Usage:

    .. code-block:: python

      from google.genai import types
      from google import genai

      client = genai.Client(
          vertexai=True, project='my-project-id', location='us-central1'
      )

      async for chunk in await client.aio.models.generate_content_stream(
        model='gemini-2.0-flash',
        contents='''What is a good name for a flower shop that specializes in
          selling bouquets of dried flowers?'''
      ):
        print(chunk.text)
      # **Elegant & Classic:**
      # * The Dried Bloom
      # * Everlasting Florals
      # * Timeless Petals

      async for chunk in await client.aio.models.generate_content_stream(
        model='gemini-2.0-flash',
        contents=[
          types.Part.from_text('What is shown in this image?'),
          types.Part.from_uri('gs://generativeai-downloads/images/scones.jpg',
          'image/jpeg')
        ]
      ):
        print(chunk.text)
      # The image shows a flat lay arrangement of freshly baked blueberry
      # scones.
    """
    if getattr(
        self._api_client, 'vertexai', False
    ) and _extra_utils.has_agent_platform_mcp_servers(config):
      raise NotImplementedError(
          'MCP servers are not yet supported for streaming in the Agent'
          ' Platform API.'
      )

    # Retrieve and cache any MCP sessions if provided.
    incompatible_tools_indexes = (
        _extra_utils.find_afc_incompatible_tool_indexes(
            config,
            is_agent_platform=getattr(self._api_client, 'vertexai', False),
        )
    )
    # Retrieve and cache any MCP sessions if provided.
    parsed_config, mcp_to_genai_tool_adapters = (
        await _extra_utils.parse_config_for_mcp_sessions(
            config,
            is_agent_platform=getattr(self._api_client, 'vertexai', False),
        )
    )
    if _extra_utils.should_disable_afc(parsed_config):
      response = await self._generate_content_stream(
          model=model, contents=contents, config=parsed_config
      )

      async def base_async_generator(model, contents, config):  # type: ignore[no-untyped-def]
        async for chunk in response:  # type: ignore[attr-defined]
          yield chunk

      return base_async_generator(model, contents, parsed_config)  # type: ignore[no-untyped-call, no-any-return]

    if incompatible_tools_indexes:
      original_tools_length = 0
      if isinstance(config, types.GenerateContentConfig):
        if config.tools:
          original_tools_length = len(config.tools)
      elif isinstance(config, dict):
        tools = config.get('tools', [])
        if tools:
          original_tools_length = len(tools)
      if len(incompatible_tools_indexes) != original_tools_length:
        indices_str = ', '.join(map(str, incompatible_tools_indexes))
        logger.warning(
            'Tools at indices [%s] are not compatible with automatic function '
            'calling (AFC). AFC is disabled. If AFC is intended, please '
            'include python callables in the tool list, and do not include '
            'function declaration and MCP server in the tool list.',
            indices_str,
        )
      response = await self._generate_content_stream(
          model=model, contents=contents, config=parsed_config
      )

      async def base_async_generator(model, contents, config):  # type: ignore[no-untyped-def]
        async for chunk in response:  # type: ignore[attr-defined]
          yield chunk

      return base_async_generator(model, contents, parsed_config)  # type: ignore[no-untyped-call, no-any-return]

    # With tool compatibility confirmed, validate that the configuration are
    # compatible with each other and raise an error if invalid.
    _extra_utils.raise_error_for_afc_incompatible_config(parsed_config)

    async def async_generator(model, contents, config):  # type: ignore[no-untyped-def]
      remaining_remote_calls_afc = _extra_utils.get_max_remote_calls_afc(config)
      logger.info(
          f'AFC is enabled with max remote calls: {remaining_remote_calls_afc}.'
      )
      automatic_function_calling_history: list[types.Content] = []
      func_response_parts = None
      chunk = None
      i = 0
      while remaining_remote_calls_afc > 0:
        i += 1
        response = await self._generate_content_stream(
            model=model, contents=contents, config=config
        )
        # TODO: b/453739108 - make AFC logic more robust like the other 3 methods.
        if i > 1:
          logger.info(f'AFC remote call {i} is done.')
        remaining_remote_calls_afc -= 1
        if i > 1 and remaining_remote_calls_afc == 0:
          logger.info(
              'Reached max remote calls for automatic function calling.'
          )

        function_map = _extra_utils.get_function_map(
            config, mcp_to_genai_tool_adapters, is_caller_method_async=True
        )

        if i == 1:
          # First request gets a function call.
          # Then get function response parts.
          # Yield chunks only if there's no function response parts.
          async for chunk in response:  # type: ignore[attr-defined]
            if not function_map:
              contents = _extra_utils.append_chunk_contents(contents, chunk)
              yield chunk
            else:
              if (
                  not chunk.candidates
                  or not chunk.candidates[0].content
                  or not chunk.candidates[0].content.parts
              ):
                break
              func_response_parts = (
                  await _extra_utils.get_function_response_parts_async(
                      chunk, function_map
                  )
              )
              if not func_response_parts:
                contents = _extra_utils.append_chunk_contents(contents, chunk)
                yield chunk

        else:
          #  Second request and beyond, yield chunks.
          async for chunk in response:  # type: ignore[attr-defined]

            if _extra_utils.should_append_afc_history(config):
              chunk.automatic_function_calling_history = (
                  automatic_function_calling_history
              )
            contents = _extra_utils.append_chunk_contents(contents, chunk)
            yield chunk
          if (
              chunk is None
              or not chunk.candidates
              or not chunk.candidates[0].content
              or not chunk.candidates[0].content.parts
          ):
            break
          func_response_parts = (
              await _extra_utils.get_function_response_parts_async(
                  chunk, function_map
              )
          )
        if not function_map:
          break

        if not func_response_parts:
          break

        if chunk is None:
          continue
        # Append function response parts to contents for the next request.
        func_call_content = chunk.candidates[0].content
        func_response_content = types.Content(
            role='user',
            parts=func_response_parts,
        )
        contents = t.t_contents(contents)
        if not automatic_function_calling_history:
          automatic_function_calling_history.extend(contents)
        if isinstance(contents, list) and func_call_content is not None:
          contents.append(func_call_content)
          contents.append(func_response_content)
        if func_call_content is not None:
          automatic_function_calling_history.append(func_call_content)
        automatic_function_calling_history.append(func_response_content)

    return async_generator(model, contents, parsed_config)  # type: ignore[no-untyped-call, no-any-return]

  async def edit_image(
      self,
      *,
      model: str,
      prompt: str,
      reference_images: list[types._ReferenceImageAPIOrDict],
      config: Optional[types.EditImageConfigOrDict] = None,
  ) -> types.EditImageResponse:
    """Edits an image based on a text description and configuration.

    Args:
      model (str): The model to use.
      prompt (str): A text description of the edit to apply to the image.
        reference_images (list[Union[RawReferenceImage, MaskReferenceImage,
        ControlReferenceImage, StyleReferenceImage, SubjectReferenceImage]): The
        reference images for editing.
      config (EditImageConfig): Configuration for editing.

    Usage:

    .. code-block:: python

      from google.genai.types import RawReferenceImage, MaskReferenceImage

      raw_ref_image = RawReferenceImage(
        reference_id=1,
        reference_image=types.Image.from_file(location=IMAGE_FILE_PATH),
      )

      mask_ref_image = MaskReferenceImage(
        reference_id=2,
        config=types.MaskReferenceConfig(
            mask_mode='MASK_MODE_FOREGROUND',
            mask_dilation=0.06,
        ),
      )
      response = await client.aio.models.edit_image(
        model='imagen-3.0-capability-001',
        prompt='man with dog',
        reference_images=[raw_ref_image, mask_ref_image],
        config=types.EditImageConfig(
            edit_mode= "EDIT_MODE_INPAINT_INSERTION",
            number_of_images= 1,
            include_rai_reason= True,
        )
      )
      response.generated_images[0].image.show()
      # Shows a man with a dog instead of a cat.
    """
    return await self._edit_image(
        model=model,
        prompt=prompt,
        reference_images=reference_images,
        config=config,
    )

  async def list(
      self,
      *,
      config: Optional[types.ListModelsConfigOrDict] = None,
  ) -> AsyncPager[types.Model]:
    """Makes an API request to list the available models.

    If `query_base` is set to True in the config or not set (default), the
    API will return all available base models. If set to False, it will return
    all tuned models.

    Args:
      config (ListModelsConfigOrDict): Configuration for retrieving models.

    Usage:

    .. code-block:: python

      response = await client.aio.models.list(config={'page_size': 5})
      print(response.page)
      # [Model(name='projects/./locations/./models/123', display_name='my_model'

      response = await client.aio.models.list(
          config={'page_size': 5, 'query_base': True}
        )
      print(response.page)
      # [Model(name='publishers/google/models/gemini-2.0-flash-exp' ...
    """

    config = (
        types._ListModelsParameters(config=config).config
        or types.ListModelsConfig()
    )
    if config.query_base is None:
      config.query_base = True
    if self._api_client.vertexai:
      config = config.copy()
      if not config.query_base:
        # Filter for tuning jobs artifacts by labels.
        filter_value = config.filter
        config.filter = (
            filter_value + '&filter=labels.tune-type:*'
            if filter_value
            else 'labels.tune-type:*'
        )
    return AsyncPager(
        'models',
        self._list,
        await self._list(config=config),
        config,
    )

  async def generate_images(
      self,
      *,
      model: str,
      prompt: str,
      config: Optional[types.GenerateImagesConfigOrDict] = None,
  ) -> types.GenerateImagesResponse:
    """Generates images based on a text description and configuration.

    Args:
      model (str): The model to use.
      prompt (str): A text description of the images to generate.
      config (GenerateImagesConfig): Configuration for generation.

    Usage:

    .. code-block:: python

      response = await client.aio.models.generate_images(
        model='imagen-3.0-generate-002',
        prompt='Man with a dog',
        config=types.GenerateImagesConfig(
            number_of_images= 1,
            include_rai_reason= True,
        )
      )
      response.generated_images[0].image.show()
      # Shows a man with a dog.
    """
    api_response = await self._generate_images(
        model=model,
        prompt=prompt,
        config=config,
    )
    positive_prompt_safety_attributes = None
    generated_images = []
    if not api_response or not api_response.generated_images:
      return api_response

    for generated_image in api_response.generated_images:
      if (
          generated_image.safety_attributes
          and generated_image.safety_attributes.content_type
          == 'Positive Prompt'
      ):
        positive_prompt_safety_attributes = generated_image.safety_attributes
      else:
        generated_images.append(generated_image)

    response = types.GenerateImagesResponse(
        generated_images=generated_images,
        positive_prompt_safety_attributes=positive_prompt_safety_attributes,
    )
    return response

  async def upscale_image(
      self,
      *,
      model: str,
      image: types.ImageOrDict,
      upscale_factor: str,
      config: Optional[types.UpscaleImageConfigOrDict] = None,
  ) -> types.UpscaleImageResponse:
    """Makes an API request to upscale a provided image.

    Args:
      model (str): The model to use.
      image (Image): The input image for upscaling.
      upscale_factor (str): The factor to upscale the image (x2 or x4).
      config (UpscaleImageConfig): Configuration for upscaling.

    Usage:

    .. code-block:: python

      from google.genai.types import Image

      IMAGE_FILE_PATH="my-image.png"
      response = await client.aio.models.upscale_image(
          model='imagen-3.0-generate-001',
          image=types.Image.from_file(location=IMAGE_FILE_PATH),
          upscale_factor='x2',
      )
      response.generated_images[0].image.show()
      # Opens my-image.png which is upscaled by a factor of 2.
    """

    # Validate config.
    types.UpscaleImageParameters(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=config,
    )

    # Convert to API config.
    config = config or {}
    if isinstance(config, types.UpscaleImageConfig):
      config_dct = config.model_dump()
    else:
      config_dct = dict(config)
    api_config = types._UpscaleImageAPIConfigDict(
        http_options=config_dct.get('http_options', None),
        output_gcs_uri=config_dct.get('output_gcs_uri', None),
        safety_filter_level=config_dct.get('safety_filter_level', None),
        person_generation=config_dct.get('person_generation', None),
        include_rai_reason=config_dct.get('include_rai_reason', None),
        output_mime_type=config_dct.get('output_mime_type', None),
        output_compression_quality=config_dct.get(
            'output_compression_quality', None
        ),
        enhance_input_image=config_dct.get('enhance_input_image', None),
        image_preservation_factor=config_dct.get(
            'image_preservation_factor', None
        ),
        labels=config_dct.get('labels', None),
    )  # pylint: disable=protected-access

    # Provide default values through API config.
    api_config['mode'] = 'upscale'
    api_config['number_of_images'] = 1

    return await self._upscale_image(
        model=model,
        image=image,
        upscale_factor=upscale_factor,
        config=api_config,
    )

  async def generate_videos(
      self,
      *,
      model: str,
      prompt: Optional[str] = None,
      image: Optional[types.ImageOrDict] = None,
      video: Optional[types.VideoOrDict] = None,
      source: Optional[types.GenerateVideosSourceOrDict] = None,
      config: Optional[types.GenerateVideosConfigOrDict] = None,
  ) -> types.GenerateVideosOperation:
    """Generates videos based on an input (text, image, or video) and configuration.

    The following use cases are supported:
    1. Text to video generation.
    2a. Image to video generation (additional text prompt is optional).
    2b. Image to video generation with frame interpolation (specify last_frame
    in config).
    3. Video extension (additional text prompt is optional)

    Args:
      model: The model to use.
      prompt: The text prompt for generating the videos. Optional for image to
        video and video extension use cases. This argument is deprecated, please
        use source instead.
      image: The input image for generating the videos. Optional if prompt is
        provided. This argument is deprecated, please use source instead.
      video: The input video for video extension use cases. Optional if prompt
        or image is provided. This argument is deprecated, please use source
        instead.
      source: The input source for generating the videos (prompt, image, and/or
        video)
      config: Configuration for generation.

    Usage:

      ```
      operation = client.models.generate_videos(
          model="veo-2.0-generate-001",
          source=types.GenerateVideosSource(
              prompt="A neon hologram of a cat driving at top speed",
          ),
      )
      while not operation.done:
          time.sleep(10)
          operation = client.operations.get(operation)

      operation.result.generated_videos[0].video.uri
      ```
    """
    if (prompt or image or video) and source:
      raise ValueError(
          'Source and prompt/image/video are mutually exclusive.'
          + ' Please only use source.'
      )
    # Gemini Developer API does not support video bytes.
    video_dct: dict[str, Any] = {}
    if not self._api_client.vertexai and video:
      if isinstance(video, types.Video):
        video_dct = video.model_dump()
      else:
        video_dct = dict(video)

      if video_dct.get('uri') and video_dct.get('video_bytes'):
        video = types.Video(
            uri=video_dct.get('uri'), mime_type=video_dct.get('mime_type')
        )
    elif not self._api_client.vertexai and source:
      if isinstance(source, types.GenerateVideosSource):
        source_dct = source.model_dump()
        video_dct = source_dct.get('video', {})
      else:
        source_dct = dict(source)
        if isinstance(source_dct.get('video'), types.Video):
          video_obj: types.Video = source_dct.get('video', types.Video())
          video_dct = video_obj.model_dump()
      if video_dct and video_dct.get('uri') and video_dct.get('video_bytes'):
        source = types.GenerateVideosSource(
            prompt=source_dct.get('prompt'),
            image=source_dct.get('image'),
            video=types.Video(
                uri=video_dct.get('uri'),
                mime_type=video_dct.get('mime_type'),
            ),
        )
    return await self._generate_videos(
        model=model,
        prompt=prompt,
        image=image,
        video=video,
        source=source,
        config=config,
    )

  async def embed_content(
      self,
      *,
      model: str,
      contents: Union[types.ContentListUnion, types.ContentListUnionDict],
      config: Optional[types.EmbedContentConfigOrDict] = None,
  ) -> types.EmbedContentResponse:
    """Calculates embeddings for the given contents.

    Args:
      model (str): The model to use.
      contents (list[Content]): The contents to embed.
      config (EmbedContentConfig): Optional configuration for embeddings.

    Usage:

    .. code-block:: python

      embeddings = await client.aio.models.embed_content(
          model= 'text-embedding-004',
          contents=[
              'What is your name?',
              'What is your favorite color?',
          ],
          config={
              'output_dimensionality': 64
          },
      )

      multimodal_embeddings = await client.aio.models.embed_content(
          model='gemini-embedding-2-preview',
          contents=[
              types.Part.from_uri(
                  file_uri='gs://generativeai-downloads/images/scones.jpg',
                  mime_type='image/jpeg',
              ),
          ],
          config={
              'output_dimensionality': 64
          },
      )
    """
    if not self._api_client.vertexai:
      if 'gemini-embedding-2' in model:
        contents = t.t_contents(contents)  # type: ignore[assignment]
      return await self._embed_content(
          model=model, contents=contents, config=config
      )
    if t.t_is_vertex_embed_content_model(model):
      normalized_contents = t.t_contents(contents)
      if len(normalized_contents) > 1:
        raise ValueError(
            'The embedContent API for this model only supports one content at a'
            ' time.'
        )
      return await self._embed_content(
          model=model,
          contents=contents,
          content=normalized_contents[0],
          embedding_api_type=types.EmbeddingApiType.EMBED_CONTENT,
          config=config,
      )
    else:
      return await self._embed_content(
          model=model,
          contents=contents,
          embedding_api_type=types.EmbeddingApiType.PREDICT,
          config=config,
      )
