Source code for onnxscript.ir.serde

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Serialize and deserialize the intermediate representation to/from ONNX protos."""

# NOTES for developers:
# NOTE: Do not import pathlib in the IR. It is slow. Use os.path methods instead.
#
# NOTE: Protobuf serialization
#     Initializing a protobuf message with initialized protobuf messages incurs
#     a copy and is slow. Instead, use proto.add() to add to a repeated field.
#     or initialize the message first and then set the fields if the fields are
#     plain Python objects.

from __future__ import annotations

import functools

__all__ = [
    # Tensors
    "TensorProtoTensor",
    # Deserialization
    "from_proto",
    "deserialize_attribute",
    "deserialize_dimension",
    "deserialize_function",
    "deserialize_graph",
    "deserialize_metadata_props",
    "deserialize_model",
    "deserialize_node",
    "deserialize_opset_import",
    "deserialize_tensor",
    "deserialize_type_proto_for_shape",
    "deserialize_type_proto_for_type",
    "deserialize_value_info_proto",
    # Serialization
    "to_proto",
    "serialize_attribute_into",
    "serialize_attribute",
    "serialize_dimension_into",
    "serialize_function_into",
    "serialize_function",
    "serialize_graph_into",
    "serialize_graph",
    "serialize_model_into",
    "serialize_model",
    "serialize_node_into",
    "serialize_node",
    "serialize_shape_into",
    "serialize_reference_attribute_into",
    "serialize_tensor_into",
    "serialize_tensor",
    "serialize_type_into",
    "serialize_type",
    "serialize_value_into",
    "serialize_value",
    "SerdeError",
]

import collections
import logging
import os
import typing
from typing import Any, Callable, List, Mapping, Sequence

import numpy as np
import onnx
import onnx.external_data_helper

from onnxscript.ir import _core, _enums, _metadata, _protocols, _type_casting

if typing.TYPE_CHECKING:
    import google.protobuf.internal.containers as proto_containers
    import numpy.typing as npt

logger = logging.getLogger(__name__)

_PLEASE_CONTRIBUTE = (
    "Please contribute by creating a PR at https://github.com/microsoft/onnxscript."
)
_FUNCTION_VALUE_INFO_SUPPORTED_VERSION = (
    10  # ONNX IR version where value info in functions was introduced
)
_T = typing.TypeVar("_T", bound=Callable[..., Any])


class SerdeError(RuntimeError):
    """Error during serialization or deserialization."""


def _capture_errors(arg_capturer: Callable[..., str]) -> Callable[[_T], _T]:
    """Decorator to capture errors and display the stack."""

    def decorator(func: _T) -> _T:
        @functools.wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            try:
                return func(*args, **kwargs)
            except Exception as e:
                raise SerdeError(
                    f"Error calling {func.__name__} with: {arg_capturer(*args, **kwargs)}"
                ) from e

        return wrapper  # type: ignore

    return decorator


def _little_endian_dtype(dtype) -> np.dtype:
    """Create a small endian dtype on all platforms.

    This is useful because ONNX always stores raw_data in small endian. On big
    endian platforms, we still need to interpret the raw_data in small endian.
    """
    return np.dtype(dtype).newbyteorder("<")


def _unflatten_complex(
    array: npt.NDArray[np.float32 | np.float64],
) -> npt.NDArray[np.complex64 | np.complex128]:
    """Convert the real representation of a complex dtype to the complex dtype."""
    return array[::2] + 1j * array[1::2]


def from_proto(
    proto: onnx.ModelProto
    | onnx.GraphProto
    | onnx.NodeProto
    | onnx.TensorProto
    | onnx.AttributeProto
    | onnx.ValueInfoProto
    | onnx.TypeProto
    | onnx.FunctionProto,
) -> Any:
    """Deserialize an ONNX proto message to an IR object."""
    if isinstance(proto, onnx.ModelProto):
        return deserialize_model(proto)
    if isinstance(proto, onnx.GraphProto):
        return deserialize_graph(proto)
    if isinstance(proto, onnx.NodeProto):
        return deserialize_node(proto)
    if isinstance(proto, onnx.TensorProto):
        return deserialize_tensor(proto)
    if isinstance(proto, onnx.AttributeProto):
        return deserialize_attribute(proto)
    if isinstance(proto, onnx.ValueInfoProto):
        return deserialize_value_info_proto(proto, None)
    if isinstance(proto, onnx.TypeProto):
        return _core.TypeAndShape(
            deserialize_type_proto_for_type(proto),
            deserialize_type_proto_for_shape(proto),
        )
    if isinstance(proto, onnx.FunctionProto):
        return deserialize_function(proto)
    raise NotImplementedError(
        f"Deserialization of {type(proto)} in from_proto is not implemented. "
        "Use a specific ir.serde.deserialize* function instead."
    )


def to_proto(
    ir_object: _protocols.ModelProtocol
    | _protocols.GraphProtocol
    | _protocols.NodeProtocol
    | _protocols.ValueProtocol
    | _protocols.AttributeProtocol
    | _protocols.ReferenceAttributeProtocol
    | _protocols.TensorProtocol
    | _protocols.TypeProtocol
    | _protocols.GraphViewProtocol
    | _protocols.FunctionProtocol,
) -> Any:
    """Serialize an IR object to a proto."""
    if isinstance(ir_object, _protocols.ModelProtocol):
        return serialize_model(ir_object)
    if isinstance(ir_object, _protocols.GraphProtocol):
        return serialize_graph(ir_object)
    if isinstance(ir_object, _protocols.NodeProtocol):
        return serialize_node(ir_object)
    if isinstance(ir_object, _protocols.TensorProtocol):
        return serialize_tensor(ir_object)
    if isinstance(ir_object, _protocols.ValueProtocol):
        return serialize_value(ir_object)
    if isinstance(ir_object, _protocols.AttributeProtocol):
        return serialize_attribute(ir_object)
    if isinstance(ir_object, _protocols.ReferenceAttributeProtocol):
        return serialize_reference_attribute_into(onnx.AttributeProto(), ir_object)
    if isinstance(ir_object, _protocols.TypeProtocol):
        return serialize_type_into(onnx.TypeProto(), ir_object)
    if isinstance(ir_object, _protocols.GraphViewProtocol):
        return serialize_graph(ir_object)
    if isinstance(ir_object, _protocols.FunctionProtocol):
        return serialize_function(ir_object)
    raise NotImplementedError(
        f"Serialization of {type(ir_object)} in to_proto is not implemented. "
        "Use a specific ir.serde.serialize* function instead."
    )


class TensorProtoTensor(_core.TensorBase):  # pylint: disable=too-many-ancestors
    """A tensor initialized from a tensor proto."""

    def __init__(self, proto: onnx.TensorProto) -> None:
        self._proto = proto
        self._metadata_props: dict[str, str] | None = deserialize_metadata_props(
            proto.metadata_props
        )
        self._metadata: _metadata.MetadataStore | None = None

    @property
    def name(self) -> str:
        return self._proto.name

    @name.setter
    def name(self, value: str | None) -> None:
        if value is None:
            self._proto.ClearField("name")
        else:
            self._proto.name = value

    @property
    def shape(self) -> _core.Shape:
        return _core.Shape(self._proto.dims, frozen=True)

    @property
    def dtype(self) -> _enums.DataType:
        return _enums.DataType(self._proto.data_type)

    @property
    def doc_string(self) -> str:
        return self._proto.doc_string

    @property
    def raw(self) -> onnx.TensorProto:
        return self._proto

    def __repr__(self) -> str:
        # It is a little hard to display the content when there can be types
        # unsupported by numpy
        # Preferably we should display some content when the tensor is small
        return f"{self._repr_base()}(name={self.name!r})"

    def __array__(self, dtype: Any = None) -> np.ndarray:
        """Return the tensor as a numpy array, compatible with np.array."""
        return self.numpy().__array__(dtype)

    def __dlpack__(self, *, stream: Any = None) -> Any:
        return self.numpy().__dlpack__(stream=stream)

    def __dlpack_device__(self) -> tuple[int, int]:
        return self.numpy().__dlpack_device__()

[docs] def numpy(self) -> np.ndarray: """Return the tensor as a numpy array. This is an improved version of onnx.numpy_helper.to_array. It first reads the data using the dtype corresponding to the tensor proto data field, then converts it to the correct dtype and shape. Special cases are bfloat16, complex and int4 where we need to reinterpret the data. Other types can simply be casted. When the data type is not supported by numpy, the dtypes from the ``ml_dtype`` package are used. The values can be reinterpreted as bit representations using the ``.view()`` method. When the data type is a string, this method returns a numpy array of bytes instead of a numpy array of strings, to follow the ONNX specification. External tensors are not supported by this class. Use :class:`onnxscript.ir.ExternalTensor` instead. Raises: ValueError: If the data type is UNDEFINED. """ dtype = self.dtype if dtype == _enums.DataType.UNDEFINED: raise ValueError("Cannot convert UNDEFINED tensor to numpy array.") if self._proto.data_location == onnx.TensorProto.EXTERNAL: raise ValueError( "Cannot convert external tensor to numpy array. " "Use ir.ExternalTensor instead." ) if self._proto.HasField("raw_data"): array = np.frombuffer(self._proto.raw_data, dtype=dtype.numpy().newbyteorder("<")) # Cannot return now, because we may need to unpack 4bit tensors elif dtype == _enums.DataType.STRING: return np.array(self._proto.string_data).reshape(self._proto.dims) elif self._proto.int32_data: array = np.array(self._proto.int32_data, dtype=_little_endian_dtype(np.int32)) if dtype in {_enums.DataType.FLOAT16, _enums.DataType.BFLOAT16}: # Reinterpret the int32 as float16 or bfloat16 array = array.astype(np.uint16).view(dtype.numpy()) elif dtype in { _enums.DataType.FLOAT8E4M3FN, _enums.DataType.FLOAT8E4M3FNUZ, _enums.DataType.FLOAT8E5M2, _enums.DataType.FLOAT8E5M2FNUZ, }: array = array.astype(np.uint8).view(dtype.numpy()) elif self._proto.int64_data: array = np.array(self._proto.int64_data, dtype=_little_endian_dtype(np.int64)) elif self._proto.uint64_data: array = np.array(self._proto.uint64_data, dtype=_little_endian_dtype(np.uint64)) elif self._proto.float_data: array = np.array(self._proto.float_data, dtype=_little_endian_dtype(np.float32)) if dtype == _enums.DataType.COMPLEX64: array = _unflatten_complex(array) elif self._proto.double_data: array = np.array(self._proto.double_data, dtype=_little_endian_dtype(np.float64)) if dtype == _enums.DataType.COMPLEX128: array = _unflatten_complex(array) else: # Empty tensor if not self._proto.dims: # When dims not precent and there is no data, we return an empty array return np.array([], dtype=dtype.numpy()) else: # Otherwise we return a size 0 array with the correct shape return np.zeros(self._proto.dims, dtype=dtype.numpy()) if dtype == _enums.DataType.INT4: return _type_casting.unpack_int4(array.astype(np.uint8), self._proto.dims) elif dtype == _enums.DataType.UINT4: return _type_casting.unpack_uint4(array.astype(np.uint8), self._proto.dims) else: # Otherwise convert to the correct dtype and reshape # Note we cannot use view() here because the storage dtype may not be the same size as the target return array.astype(dtype.numpy()).reshape(self._proto.dims)
[docs] def tobytes(self) -> bytes: """Return the tensor as a byte string conformed to the ONNX specification, in little endian. Raises: ValueError: If the tensor is a string tensor or an external tensor. ValueError: If the tensor is of UNDEFINED data type. """ if self._proto.data_location == onnx.TensorProto.EXTERNAL: raise ValueError( "Cannot convert external tensor to bytes. Use ir.ExternalTensor instead." ) if self.dtype == _enums.DataType.STRING: raise ValueError("Cannot convert string tensor to bytes.") if self.dtype == _enums.DataType.UNDEFINED: raise ValueError("Cannot convert UNDEFINED tensor to bytes.") if self._proto.HasField("raw_data"): return self._proto.raw_data if self._proto.float_data: return np.array( self._proto.float_data, dtype=_little_endian_dtype(np.float32) ).tobytes() if self._proto.int32_data: array = np.array(self._proto.int32_data, dtype=np.int32) if self.dtype in { _enums.DataType.INT16, _enums.DataType.UINT16, _enums.DataType.FLOAT16, _enums.DataType.BFLOAT16, }: return array.astype(_little_endian_dtype(np.uint16)).tobytes() if self.dtype in { _enums.DataType.INT8, _enums.DataType.UINT8, _enums.DataType.BOOL, _enums.DataType.FLOAT8E4M3FN, _enums.DataType.FLOAT8E4M3FNUZ, _enums.DataType.FLOAT8E5M2, _enums.DataType.FLOAT8E5M2FNUZ, _enums.DataType.INT4, _enums.DataType.UINT4, }: # uint4 and int4 values are already packed, even when stored as int32 # so we don't need to pack them again return array.astype(_little_endian_dtype(np.uint8)).tobytes() assert self.dtype == _enums.DataType.INT32 return array.tobytes() if self._proto.int64_data: return np.array( self._proto.int64_data, dtype=_little_endian_dtype(np.int64) ).tobytes() if self._proto.double_data: return np.array( self._proto.double_data, dtype=_little_endian_dtype(np.float64) ).tobytes() if self._proto.uint64_data: array = np.array(self._proto.uint64_data, dtype=_little_endian_dtype(np.uint64)) if self.dtype == _enums.DataType.UINT32: return array.astype(_little_endian_dtype(np.uint32)).tobytes() assert self.dtype == _enums.DataType.UINT64 return array.tobytes() # The repeating fields can be empty and still valid. # For example, int32_data can be empty and still be a valid tensor. return b""
@property def meta(self) -> _metadata.MetadataStore: """The metadata store for intermediate analysis. Write to the :attr:`metadata_props` if you would like the metadata to be serialized to the ONNX proto. """ if self._metadata is None: self._metadata = _metadata.MetadataStore() return self._metadata @property def metadata_props(self) -> dict[str, str]: if self._metadata_props is None: self._metadata_props = {} return self._metadata_props def _get_field(proto: Any, field: str) -> Any: if proto.HasField(field): return getattr(proto, field) return None # Deserialization def deserialize_opset_import( protos: Sequence[onnx.OperatorSetIdProto], ) -> dict[str, int]: return {opset.domain: opset.version for opset in protos} def _parse_experimental_function_value_info_name( name: str, ) -> tuple[str, str, str] | None: """Get the function domain, name and value name if the value info is for a function. The experimental format is: {function_domain}::{function_name}/{value_name} Args: name: The name stored in the value info. Returns: A tuple of the function domain, function name and value name if the value info is for a function. None otherwise. """ parts = name.split("/") expected_parts = 2 if len(parts) != expected_parts: return None function, value_name = parts parts = function.split("::") if len(parts) != expected_parts: return None # NOTE: There will not be overload because overloads are introduced in ONNX IR v10, which also # introduces the ValueInfoProto for functions function_domain, function_name = parts return function_domain, function_name, value_name def deserialize_model(proto: onnx.ModelProto) -> _core.Model: graph = _deserialize_graph(proto.graph, []) graph.opset_imports.update(deserialize_opset_import(proto.opset_import)) functions = [] for func in proto.functions: functions.append(deserialize_function(func)) model = _core.Model( graph, ir_version=proto.ir_version, producer_name=_get_field(proto, "producer_name"), producer_version=_get_field(proto, "producer_version"), domain=_get_field(proto, "domain"), model_version=_get_field(proto, "model_version"), doc_string=_get_field(proto, "doc_string"), functions=functions, meta_data_props=deserialize_metadata_props(proto.metadata_props), ) # Handle experimental value info for functions created by the dynamo exporter in IR version 9 if model.ir_version < _FUNCTION_VALUE_INFO_SUPPORTED_VERSION: _deserialized_experimental_value_info_for_function_ir9( model.functions, proto.graph.value_info ) return model def _deserialized_experimental_value_info_for_function_ir9( functions: Mapping[_protocols.OperatorIdentifier, _core.Function], value_info_protos: Sequence[onnx.ValueInfoProto], ) -> None: """Deserialize value info for functions when they are stored in an experimental format. The experimental format is: {function_domain}::{function_name}/{value_name} """ # Parse value info for functions from the main graph function_value_value_info_mapping: collections.defaultdict[ _protocols.OperatorIdentifier, dict[str, onnx.ValueInfoProto], ] = collections.defaultdict(dict) for value_info_proto in value_info_protos: if ( parsed := _parse_experimental_function_value_info_name(value_info_proto.name) ) is None: continue function_domain, function_name, value_name = parsed function_overload = "" # TODO(justinchuby): Create a constructor for OperatorIdentifier so we don't create tuples manually function_id = (function_domain, function_name, function_overload) function = functions.get(function_id) if function is None: # Function not found logger.debug( "Function with ID '%s' not found in model functions. Value info '%s' will be ignored.", function_id, value_info_proto.name, ) continue function_value_value_info_mapping[function_id][value_name] = value_info_proto for function_id, function in functions.items(): for input in function.inputs: if input.name in function_value_value_info_mapping[function_id]: deserialize_value_info_proto( function_value_value_info_mapping[function_id][input.name], input ) for node in function: for output in node.outputs: if output.name in function_value_value_info_mapping[function_id]: deserialize_value_info_proto( function_value_value_info_mapping[function_id][output.name], output, ) # The function outputs are handled as well because they are also node outputs def deserialize_graph(proto: onnx.GraphProto) -> _core.Graph: """Deserialize a graph proto, recursively if needed. Args: proto: The graph proto to deserialize. Returns: IR Graph. """ return _deserialize_graph(proto, []) @_capture_errors(lambda proto, scoped_values: proto.name) def _deserialize_graph( proto: onnx.GraphProto, scoped_values: list[dict[str, _core.Value]] ) -> _core.Graph: """Deserialize a graph proto, recursively if needed. Args: proto: The graph proto to deserialize. scoped_values: A list of dictionaries mapping value names to their corresponding Value objects. Every time we enter a new graph, a new scope is created and appended to this list to include all values defined in the scope. scoped_value_info: A list of dictionaries mapping value names to their corresponding ValueInfoProto. Returns: IR Graph. """ # Create values for initializers and inputs initializer_tensors = [deserialize_tensor(tensor) for tensor in proto.initializer] inputs = [_core.Input(info.name) for info in proto.input] for info, value in zip(proto.input, inputs): deserialize_value_info_proto(info, value) # Initialize the values dictionary for this graph scope with the inputs and initializers values: dict[str, _core.Value] = {v.name: v for v in inputs} # type: ignore[misc] scoped_values.append(values) initializer_values = [] for tensor in initializer_tensors: if tensor.name in values: # The initializer is for an input initializer_value = values[tensor.name] initializer_value.const_value = tensor else: # The initializer is for some other value. Create this value first initializer_value = _core.Value( None, index=None, name=tensor.name, # TODO(justinchuby): Fix type hinting for shape and dtype shape=tensor.shape, # type: ignore type=_core.TensorType(tensor.dtype), const_value=tensor, ) values[tensor.name] = initializer_value # type: ignore[index] initializer_values.append(initializer_value) # Add ValueInfos for this graph scope value_info = {info.name: info for info in proto.value_info} # Deserialize nodes with all known values nodes = [_deserialize_node(node, scoped_values, value_info) for node in proto.node] # Fill in values for graph outputs outputs = [deserialize_value_info_proto(info, values[info.name]) for info in proto.output] scoped_values.pop() return _core.Graph( inputs, outputs, nodes=nodes, initializers=initializer_values, doc_string=_get_field(proto, "doc_string"), name=_get_field(proto, "name"), metadata_props=deserialize_metadata_props(proto.metadata_props), ) @_capture_errors(lambda proto: proto.name) def deserialize_function(proto: onnx.FunctionProto) -> _core.Function: inputs = [_core.Input(name) for name in proto.input] values: dict[str, _core.Value] = {v.name: v for v in inputs} # type: ignore[misc] value_info = {info.name: info for info in getattr(proto, "value_info", [])} # TODO(justinchuby): Handle unsorted nodes nodes = [_deserialize_node(node, [values], value_info=value_info) for node in proto.node] outputs = [values[name] for name in proto.output] graph = _core.Graph( inputs, outputs, nodes=nodes, initializers=(), doc_string=_get_field(proto, "doc_string"), opset_imports=deserialize_opset_import(proto.opset_import), name=( f"{proto.name}_{proto.domain}" + f"__{proto.overload}" if hasattr(proto, "overload") and proto.overload else "" ), ) attributes = [_deserialize_attribute(attr, []) for attr in proto.attribute_proto] # Attributes without defaults attributes += [ _core.Attr(name, _enums.AttributeType.UNDEFINED, None) for name in proto.attribute ] return _core.Function( domain=proto.domain, name=proto.name, overload=getattr(proto, "overload", ""), graph=graph, attributes=typing.cast(List[_core.Attr], attributes), metadata_props=deserialize_metadata_props(proto.metadata_props), ) @_capture_errors(lambda proto, value: str(proto)) def deserialize_value_info_proto( proto: onnx.ValueInfoProto, value: _core.Value | None ) -> _core.Value: if value is None: value = _core.Value(name=proto.name) value.shape = deserialize_type_proto_for_shape(proto.type) value.type = deserialize_type_proto_for_type(proto.type) metadata_props = deserialize_metadata_props(proto.metadata_props) if metadata_props is not None: value.metadata_props.update(metadata_props) value.doc_string = _get_field(proto, "doc_string") return value @_capture_errors(str) def deserialize_type_proto_for_shape(proto: onnx.TypeProto) -> _core.Shape | None: if proto.HasField("tensor_type"): if (shape_proto := _get_field(proto.tensor_type, "shape")) is None: return None # This logic handles when the shape is [] as well dim_protos = shape_proto.dim deserialized_dim_denotations = [ deserialize_dimension(dim_proto) for dim_proto in dim_protos ] dims = [dim for dim, _ in deserialized_dim_denotations] denotations = [denotation for _, denotation in deserialized_dim_denotations] return _core.Shape(dims, denotations=denotations, frozen=True) if proto.HasField("sparse_tensor_type"): if (shape_proto := _get_field(proto.sparse_tensor_type, "shape")) is None: return None dim_protos = shape_proto.dim deserialized_dim_denotations = [ deserialize_dimension(dim_proto) for dim_proto in dim_protos ] dims = [dim for dim, _ in deserialized_dim_denotations] denotations = [denotation for _, denotation in deserialized_dim_denotations] return _core.Shape(dims, denotations=denotations, frozen=True) if proto.HasField("sequence_type"): if (elem_type := _get_field(proto.sequence_type, "elem_type")) is None: return None return deserialize_type_proto_for_shape(elem_type) if proto.HasField("optional_type"): if (elem_type := _get_field(proto.optional_type, "elem_type")) is None: return None return deserialize_type_proto_for_shape(elem_type) if proto.HasField("map_type"): # TODO(justinchuby): Do we need to support map types? raise NotImplementedError(f"Map types are not supported yet. {_PLEASE_CONTRIBUTE}") return None @_capture_errors(str) def deserialize_type_proto_for_type( proto: onnx.TypeProto, ) -> _protocols.TypeProtocol | None: denotation = _get_field(proto, "denotation") if proto.HasField("tensor_type"): if (elem_type := _get_field(proto.tensor_type, "elem_type")) is None: return None return _core.TensorType(_enums.DataType(elem_type), denotation=denotation) if proto.HasField("sparse_tensor_type"): if (elem_type := _get_field(proto.sparse_tensor_type, "elem_type")) is None: return None return _core.SparseTensorType(_enums.DataType(elem_type), denotation=denotation) if proto.HasField("sequence_type"): # FIXME(justinchuby): Allow nested types being None if (elem_type := _get_field(proto.sequence_type, "elem_type")) is None: raise ValueError(f"SequenceTypeProto must have elem_type set: {proto}") nested_type = deserialize_type_proto_for_type(elem_type) if nested_type is None: raise ValueError(f"SequenceType must have elem_type set: {proto}") return _core.SequenceType(nested_type, denotation=denotation) if proto.HasField("optional_type"): # FIXME(justinchuby): Allow nested types being None if (elem_type := _get_field(proto.optional_type, "elem_type")) is None: raise ValueError(f"SequenceTypeProto must have elem_type set: {proto}") nested_type = deserialize_type_proto_for_type(elem_type) if nested_type is None: raise ValueError(f"SequenceType must have elem_type set: {proto}") return _core.OptionalType(nested_type, denotation=denotation) if proto.HasField("map_type"): # TODO(justinchuby): Do we need to support map types? raise NotImplementedError(f"Map types are not supported yet. {_PLEASE_CONTRIBUTE}") return None @_capture_errors(str) def deserialize_dimension( proto: onnx.TensorShapeProto.Dimension, ) -> tuple[int | _core.SymbolicDim, str | None]: """Deserialize a dimension proto into (dimension, denotation). Args: proto: The dimension proto to deserialize. Returns: A tuple of the dimension and its denotation. """ value_field = proto.WhichOneof("value") denotation = _get_field(proto, "denotation") if value_field is not None: value = getattr(proto, value_field) if value_field == "dim_value": return value, denotation if value_field == "dim_param": return _core.SymbolicDim(value), denotation return _core.SymbolicDim(None), denotation @_capture_errors(lambda proto, base_path: proto.name) def deserialize_tensor( proto: onnx.TensorProto, base_path: str | os.PathLike = "" ) -> _protocols.TensorProtocol: # TODO: Sanitize base_path if proto.data_location == onnx.TensorProto.EXTERNAL: external_info = onnx.external_data_helper.ExternalDataInfo(proto) return _core.ExternalTensor( external_info.location, offset=external_info.offset, length=external_info.length, dtype=_enums.DataType(proto.data_type), base_dir=base_path, name=_get_field(proto, "name"), shape=_core.Shape(proto.dims), doc_string=_get_field(proto, "doc_string"), metadata_props=deserialize_metadata_props(proto.metadata_props), ) if proto.data_type == _enums.DataType.STRING: name = _get_field(proto, "name") doc_string = _get_field(proto, "doc_string") metadata_props = deserialize_metadata_props(proto.metadata_props) return _core.StringTensor( proto.string_data, shape=_core.Shape(proto.dims), name=name, doc_string=doc_string, metadata_props=metadata_props, ) return TensorProtoTensor(proto) def deserialize_metadata_props( proto: Sequence[onnx.StringStringEntryProto], ) -> dict[str, str] | None: if len(proto) == 0: # Avoid creating an empty dictionary to save memory return None return {entry.key: entry.value for entry in proto} def deserialize_attribute(proto: onnx.AttributeProto) -> _core.Attr | _core.RefAttr: return _deserialize_attribute(proto, []) @_capture_errors(lambda proto, scoped_values: str(proto)) def _deserialize_attribute( proto: onnx.AttributeProto, scoped_values: list[dict[str, _core.Value]] ) -> _core.Attr | _core.RefAttr: name = proto.name doc_string = _get_field(proto, "doc_string") type_ = _enums.AttributeType(proto.type) ref_attr_name = _get_field(proto, "ref_attr_name") if ref_attr_name: return _core.RefAttr(name, ref_attr_name, type_, doc_string=doc_string) if type_ == _enums.AttributeType.INT: return _core.AttrInt64(name, proto.i, doc_string=doc_string) if type_ == _enums.AttributeType.FLOAT: return _core.AttrFloat32(name, proto.f, doc_string=doc_string) if type_ == _enums.AttributeType.STRING: return _core.AttrString(name, proto.s.decode("utf-8"), doc_string=doc_string) if type_ == _enums.AttributeType.INTS: return _core.AttrInt64s(name, proto.ints, doc_string=doc_string) if type_ == _enums.AttributeType.FLOATS: return _core.AttrFloat32s(name, proto.floats, doc_string=doc_string) if type_ == _enums.AttributeType.STRINGS: return _core.AttrStrings( name, [s.decode("utf-8") for s in proto.strings], doc_string=doc_string ) if type_ == _enums.AttributeType.TENSOR: return _core.AttrTensor(name, deserialize_tensor(proto.t), doc_string=doc_string) if type_ == _enums.AttributeType.GRAPH: return _core.AttrGraph( name, _deserialize_graph(proto.g, scoped_values), doc_string=doc_string ) if type_ == _enums.AttributeType.TENSORS: return _core.AttrTensors( name, [deserialize_tensor(t) for t in proto.tensors], doc_string=doc_string, ) if type_ == _enums.AttributeType.GRAPHS: return _core.AttrGraphs( name, [_deserialize_graph(g, scoped_values) for g in proto.graphs], doc_string=doc_string, ) if type_ == _enums.AttributeType.SPARSE_TENSOR: raise NotImplementedError( f"Sparse tensors are not supported yet. {_PLEASE_CONTRIBUTE}" ) if type_ == _enums.AttributeType.SPARSE_TENSORS: raise NotImplementedError( f"Sparse tensors are not supported yet. {_PLEASE_CONTRIBUTE}" ) if type_ == _enums.AttributeType.TYPE_PROTO: ir_type = deserialize_type_proto_for_type(proto.tp) shape = deserialize_type_proto_for_shape(proto.tp) return _core.AttrTypeProto( name, _core.TypeAndShape(ir_type, shape), doc_string=doc_string ) if type_ == _enums.AttributeType.TYPE_PROTOS: type_and_shapes = [] for type_proto in proto.type_protos: ir_type = deserialize_type_proto_for_type(type_proto) shape = deserialize_type_proto_for_shape(type_proto) type_and_shapes.append(_core.TypeAndShape(ir_type, shape)) return _core.AttrTypeProtos(name, type_and_shapes, doc_string=doc_string) if type_ == _enums.AttributeType.UNDEFINED: return _core.Attr(name, type_, None, doc_string=doc_string) raise ValueError(f"Unsupported attribute type: '{type_}'") def deserialize_node(proto: onnx.NodeProto) -> _core.Node: return _deserialize_node(proto, scoped_values=[], value_info={}) @_capture_errors(lambda proto, scoped_values, value_info: str(proto)) def _deserialize_node( proto: onnx.NodeProto, scoped_values: list[dict[str, _core.Value]], value_info: dict[str, onnx.ValueInfoProto], ) -> _core.Node: node_inputs: list[_core.Value | None] = [] for input_name in proto.input: if input_name == "": # Empty input node_inputs.append(None) continue # Find the input in all value scopes found = False for values in reversed(scoped_values): if input_name not in values: continue node_inputs.append(values[input_name]) found = True del values # Remove the reference so it is not used by mistake break if not found: # If the input is not found, we know the graph may be unsorted and # the input may be a supposed-to-be initializer or an output of a node that comes later. # Here we create the value with the name and add it to the current scope. # Nodes need to check the value pool for potentially initialized outputs logger.warning( "Input '%s' of node '%s(%s::%s:%s)' not found in any scope. " "The graph may be unsorted. Creating a new input (current depth: %s) .", input_name, proto.name, proto.domain, proto.op_type, getattr(proto, "overload", ""), len(scoped_values), ) if len(scoped_values) > 1: logger.warning( "Caveat: The value is created in the subgraph. If " "the node is referencing a value that is not in the current graph, " "it is impossible to create it in the correct scope.", ) value = _core.Value(name=input_name) # Fill in shape/type information if they exist if input_name in value_info: deserialize_value_info_proto(value_info[input_name], value) node_inputs.append(value) # We can only create the value in the current scope. If the subgraph is # referencing a value that is not in the current scope, it is impossible # to create it in the correct scope. scoped_values[-1][input_name] = value # Build the output values for the node. node_outputs: list[_core.Value] = [] for output_name in proto.output: if output_name == "": # Empty output node_outputs.append(_core.Value(name="")) continue # 1. When the graph is unsorted, we may be able to find the output already created # as an input to some other nodes in the current scope. # Note that a value is always owned by the producing node. Even though a value # can be created when parsing inputs of other nodes, the new node created here # that produces the value will assume ownership. It is then impossible to transfer # the ownership to any other node. # The output can only be found in the current scope. It is impossible for # a node to produce an output that is not in its own scope. current_scope = scoped_values[-1] if output_name in current_scope: value = current_scope[output_name] else: # 2. Common scenario: the graph is sorted and this is the first time we see the output. # Create the value and add it to the current scope. value = _core.Value(name=output_name) current_scope[output_name] = value # Fill in shape/type information if they exist if output_name in value_info: deserialize_value_info_proto(value_info[output_name], value) else: logger.debug( "ValueInfoProto not found for output '%s' in node '%s' of type '%s'", output_name, proto.name, proto.op_type, ) node_outputs.append(value) return _core.Node( proto.domain, proto.op_type, node_inputs, [_deserialize_attribute(a, scoped_values) for a in proto.attribute], overload=getattr(proto, "overload", ""), outputs=node_outputs, name=proto.name, doc_string=_get_field(proto, "doc_string"), metadata_props=deserialize_metadata_props(proto.metadata_props), ) # Serialization def serialize_model(model: _protocols.ModelProtocol) -> onnx.ModelProto: return serialize_model_into(onnx.ModelProto(), from_=model) @_capture_errors( lambda model_proto, from_: ( f"ir_version={from_.ir_version}, producer_name={from_.producer_name}, " f"producer_version={from_.producer_version}, domain={from_.domain}, " ) ) def serialize_model_into( model_proto: onnx.ModelProto, from_: _protocols.ModelProtocol ) -> onnx.ModelProto: """Serialize an IR model to an ONNX model proto.""" model_proto.ir_version = from_.ir_version if from_.producer_name: model_proto.producer_name = from_.producer_name if from_.producer_version: model_proto.producer_version = from_.producer_version if from_.domain: model_proto.domain = from_.domain if from_.model_version: model_proto.model_version = from_.model_version if from_.doc_string: model_proto.doc_string = from_.doc_string # Sort names for deterministic serialization _serialize_opset_imports_into(model_proto.opset_import, from_.opset_imports) if from_.metadata_props: _serialize_metadata_props_into(model_proto.metadata_props, from_.metadata_props) serialize_graph_into(model_proto.graph, from_.graph) create_value_info_in_functions = from_.ir_version >= _FUNCTION_VALUE_INFO_SUPPORTED_VERSION for func in from_.functions.values(): serialize_function_into( model_proto.functions.add(), from_=func, create_value_info=create_value_info_in_functions, ) if not create_value_info_in_functions: # Create them in the main graph instead _serialize_experimental_value_info_for_function_ir9_into(model_proto.graph, func) return model_proto def _should_create_value_info_for_value(value: _protocols.ValueProtocol) -> bool: """Check if value info should be created for a value. Args: value: The value to check. Returns: True if value info should be created for the value. """ # No need to serialize value info if it is not set if value.shape is None and value.type is None: return False if not value.name: logger.debug("Did not serialize '%s' because its name is empty", value) return False return True def _serialize_experimental_value_info_for_function_ir9_into( graph_proto: onnx.GraphProto, function: _protocols.FunctionProtocol ) -> None: """Serialize value info for functions in an experimental format for IR version 9. Because IRv9 and older does not have ValueInfoProto for functions, we give the value info special names and store them in the main graph instead. The experimental format is: {function_domain}::{function_name}/{value_name} Args: graph_proto: The graph proto to create ValueInfoProto in. function: The function to serialize. """ # TODO(justinchuby): In the future, we can decide if it is a good idea to simply iterate over # all values in the function and call serialize_value_into instead. function_qualified_name = f"{function.domain}::{function.name}" def format_name(value_name: str) -> str: return f"{function_qualified_name}/{value_name}" for input in function.inputs: if not input.name: logging.warning( "Function '%s': Value name not set for function input: %s", function_qualified_name, input, ) continue if not _should_create_value_info_for_value(input): # No need to serialize value info if it is not set continue serialize_value_into(graph_proto.value_info.add(), input, name=format_name(input.name)) for node in function: for node_output in node.outputs: if not node_output.name: logging.warning( "Function '%s': Value name not set for node output: %s", function_qualified_name, node_output, ) continue if not _should_create_value_info_for_value(node_output): # No need to serialize value info if it is not set continue serialize_value_into( graph_proto.value_info.add(), node_output, name=format_name(node_output.name), ) def _serialize_opset_imports_into( opset_ids: proto_containers.RepeatedCompositeFieldContainer[onnx.OperatorSetIdProto], from_: Mapping[str, int], ) -> None: """Serialize opset imports into a repeated field of OperatorSetId protos. Args: opset_ids: The repeated field to serialize into. from_: The mapping of opset domains to versions to serialize. """ # Sort names for deterministic serialization for domain, version in from_.items(): opset_ids.add(domain=domain, version=version) def _serialize_metadata_props_into( string_string_entries: proto_containers.RepeatedCompositeFieldContainer[ onnx.StringStringEntryProto ], from_: Mapping[str, str], ) -> None: """Serialize metadata properties into a repeated field of string-string entries. Args: string_string_entries: The repeated field to serialize into. from_: The mapping of metadata properties to serialize. """ # Sort names for deterministic serialization for key in sorted(from_): string_string_entries.add(key=key, value=from_[key]) def serialize_graph( graph: _protocols.GraphProtocol | _protocols.GraphViewProtocol, ) -> onnx.GraphProto: """Serializes the given graph into an :class:`onnx.GraphProto`. When the graph initializers do not have `const_value` set, they will be skipped. Args: graph: The graph to be serialized. Returns: The serialized ONNX GraphProto object. """ graph_proto = onnx.GraphProto() serialize_graph_into(graph_proto, from_=graph) return graph_proto @_capture_errors( lambda graph_proto, from_: ( f"name={from_.name}, doc_string={from_.doc_string}, " f"len(inputs)={len(from_.inputs)}, len(initializers)={len(from_.initializers)}, " f"len(nodes)={len(from_)}, len(outputs)={len(from_.outputs)}, metadata_props={from_.metadata_props}" ) ) def serialize_graph_into( graph_proto: onnx.GraphProto, from_: _protocols.GraphProtocol | _protocols.GraphViewProtocol, ) -> None: if from_.name: graph_proto.name = from_.name if from_.doc_string: graph_proto.doc_string = from_.doc_string for input_ in from_.inputs: serialize_value_into(graph_proto.input.add(), input_) # TODO(justinchuby): Support sparse_initializer for initializer in from_.initializers.values(): if initializer.const_value is None: # Skip initializers without constant values logger.warning( "Initializer '%s' does not have a constant value set.", initializer.name ) continue # Make sure the tensor's name is the same as the value's name initializer.const_value.name = initializer.name serialize_tensor_into(graph_proto.initializer.add(), from_=initializer.const_value) for node in from_: serialize_node_into(graph_proto.node.add(), from_=node) for node_output in node.outputs: if not _should_create_value_info_for_value(node_output): # No need to serialize value info if it is not set continue if node_output.is_graph_output(): # No need to serialize value info for these outputs because they are also graph outputs continue serialize_value_into(graph_proto.value_info.add(), node_output) for output in from_.outputs: serialize_value_into(graph_proto.output.add(), from_=output) if from_.metadata_props: _serialize_metadata_props_into(graph_proto.metadata_props, from_.metadata_props) def serialize_function( function: _protocols.FunctionProtocol, *, create_value_info: bool = True ) -> onnx.FunctionProto: """Serialize an IR function as a FunctionProto. Args: function: The function to serialize. create_value_info: Whether to create ValueInfoProto for nodes in the function. This is supported starting from ONNX IR version 10. """ function_proto = onnx.FunctionProto() serialize_function_into( function_proto, from_=function, create_value_info=create_value_info ) return function_proto @_capture_errors(lambda function_proto, from_, create_value_info: repr(from_)) def serialize_function_into( function_proto: onnx.FunctionProto, from_: _protocols.FunctionProtocol, *, create_value_info: bool = True, ) -> None: """Serialize an IR function into a FunctionProto. Args: function_proto: The proto to serialize into. from_: The function to serialize. create_value_info: Whether to create ValueInfoProto for nodes in the function. This is supported starting from ONNX IR version 10. """ if from_.domain: function_proto.domain = from_.domain if from_.name: function_proto.name = from_.name if from_.overload: function_proto.overload = from_.overload if from_.doc_string: function_proto.doc_string = from_.doc_string if from_.opset_imports: # A valid ONNX graph should have at least one opset import, that is # the default ONNX opset. # Here we check for emptiness before serializing to keep the logic consistent _serialize_opset_imports_into(function_proto.opset_import, from_.opset_imports) if from_.metadata_props: _serialize_metadata_props_into(function_proto.metadata_props, from_.metadata_props) for input_ in from_.inputs: function_proto.input.append(input_.name) if not _should_create_value_info_for_value(input_): # No need to serialize value info if it is not set continue if not create_value_info: continue serialize_value_into(function_proto.value_info.add(), input_) for attr in from_.attributes.values(): if attr.value is not None: serialize_attribute_into(function_proto.attribute_proto.add(), from_=attr) else: # ONNX does not record type information if the attribute does not have a default function_proto.attribute.append(attr.name) for func_output in from_.outputs: function_proto.output.append(func_output.name) # No need to serialize value info for function outputs because they are # also node outputs for node in from_: serialize_node_into(function_proto.node.add(), from_=node) # Record value info for outputs for node_output in node.outputs: if not _should_create_value_info_for_value(node_output): # No need to serialize value info if it is not set continue if not create_value_info: continue serialize_value_into(function_proto.value_info.add(), node_output) def serialize_node(node: _protocols.NodeProtocol) -> onnx.NodeProto: node_proto = onnx.NodeProto() serialize_node_into(node_proto, from_=node) return node_proto def _remove_trailing_outputs( outputs: Sequence[_protocols.ValueProtocol], ) -> Sequence[_protocols.ValueProtocol]: """Remove trailing outputs that have empty names. Args: outputs: The outputs to remove trailing outputs from. Returns: The outputs with trailing outputs removed. """ for i, output in enumerate(reversed(outputs)): if output.name: return outputs[: len(outputs) - i] return [] @_capture_errors(lambda node_proto, from_: repr(from_)) def serialize_node_into(node_proto: onnx.NodeProto, from_: _protocols.NodeProtocol) -> None: node_proto.op_type = from_.op_type if from_.domain: # If the domain is "", we can assume the default domain and not set it node_proto.domain = from_.domain if from_.name: node_proto.name = from_.name if from_.overload: node_proto.overload = from_.overload if from_.doc_string: node_proto.doc_string = from_.doc_string if from_.metadata_props: _serialize_metadata_props_into(node_proto.metadata_props, from_.metadata_props) for input_ in from_.inputs: if input_ is None: node_proto.input.append("") else: node_proto.input.append(input_.name) # Do not include the trailing outputs that have empty names for output in _remove_trailing_outputs(from_.outputs): node_proto.output.append(output.name) for attr in from_.attributes.values(): if isinstance(attr, _core.Attr): serialize_attribute_into(node_proto.attribute.add(), from_=attr) elif isinstance(attr, _core.RefAttr): serialize_reference_attribute_into(node_proto.attribute.add(), from_=attr) # Handle protocol attributes for completeness. We do not check them first because # calling isinstance on a protocol can be slow. # Most of the time, we will have Attr or RefAttr so the two branches below # will not be taken. elif isinstance(attr, _protocols.AttributeProtocol): serialize_attribute_into(node_proto.attribute.add(), from_=attr) elif isinstance(attr, _protocols.ReferenceAttributeProtocol): serialize_reference_attribute_into(node_proto.attribute.add(), from_=attr) else: raise TypeError(f"Unsupported attribute type: {type(attr)}") def serialize_tensor(tensor: _protocols.TensorProtocol) -> onnx.TensorProto: tensor_proto = onnx.TensorProto() serialize_tensor_into(tensor_proto, from_=tensor) return tensor_proto @_capture_errors(lambda tensor_proto, from_: repr(from_)) def serialize_tensor_into( tensor_proto: onnx.TensorProto, from_: _protocols.TensorProtocol ) -> None: if isinstance(from_, TensorProtoTensor): # Directly copy from the tensor proto if it is available tensor_proto.CopyFrom(from_.raw) if from_.metadata_props: _serialize_metadata_props_into(tensor_proto.metadata_props, from_.metadata_props) return if from_.name: tensor_proto.name = from_.name if from_.doc_string: tensor_proto.doc_string = from_.doc_string tensor_proto.data_type = from_.dtype.value tensor_proto.dims.extend(from_.shape.numpy()) if isinstance(from_, _core.ExternalTensor): # Store external tensors as is tensor_proto.data_location = onnx.TensorProto.EXTERNAL for k, v in { "location": os.fspath(from_.location), "offset": from_.offset, "length": from_.length, }.items(): if v is not None: entry = tensor_proto.external_data.add() entry.key = k entry.value = str(v) elif isinstance(from_, _core.StringTensor): tensor_proto.string_data.extend(from_.string_data()) else: tensor_proto.raw_data = from_.tobytes() _serialize_metadata_props_into(tensor_proto.metadata_props, from_.metadata_props) def serialize_attribute(attribute: _protocols.AttributeProtocol) -> onnx.AttributeProto: attribute_proto = onnx.AttributeProto() serialize_attribute_into(attribute_proto, from_=attribute) return attribute_proto @_capture_errors(lambda attribute_proto, from_: repr(from_)) def serialize_attribute_into( attribute_proto: onnx.AttributeProto, from_: _protocols.AttributeProtocol ) -> None: attribute_proto.name = from_.name if from_.doc_string: attribute_proto.doc_string = from_.doc_string _fill_in_value_for_attribute(attribute_proto, from_.type, from_.value) def _fill_in_value_for_attribute( attribute_proto: onnx.AttributeProto, type_: _enums.AttributeType, value: Any ) -> None: if type_ == _enums.AttributeType.INT: # value: int attribute_proto.i = value attribute_proto.type = onnx.AttributeProto.INT elif type_ == _enums.AttributeType.FLOAT: # value: float attribute_proto.f = value attribute_proto.type = onnx.AttributeProto.FLOAT elif type_ == _enums.AttributeType.STRING: # value: str attribute_proto.s = value.encode("utf-8") attribute_proto.type = onnx.AttributeProto.STRING elif type_ == _enums.AttributeType.INTS: # value: Sequence[int] attribute_proto.ints.extend(value) attribute_proto.type = onnx.AttributeProto.INTS elif type_ == _enums.AttributeType.FLOATS: # value: Sequence[float] attribute_proto.floats.extend(value) attribute_proto.type = onnx.AttributeProto.FLOATS elif type_ == _enums.AttributeType.STRINGS: # value: Sequence[str] attribute_proto.strings.extend([s.encode("utf-8") for s in value]) attribute_proto.type = onnx.AttributeProto.STRINGS elif type_ == _enums.AttributeType.TENSOR: # value: _protocols.TensorProtocol serialize_tensor_into(attribute_proto.t, value) attribute_proto.type = onnx.AttributeProto.TENSOR elif type_ == _enums.AttributeType.GRAPH: # value: _protocols.GraphProtocol serialize_graph_into(attribute_proto.g, value) attribute_proto.type = onnx.AttributeProto.GRAPH elif type_ == _enums.AttributeType.TENSORS: # value: Sequence[_protocols.TensorProtocol] for tensor in value: serialize_tensor_into(attribute_proto.tensors.add(), tensor) attribute_proto.type = onnx.AttributeProto.TENSORS elif type_ == _enums.AttributeType.GRAPHS: # value: Sequence[_protocols.GraphProtocol] for graph in value: serialize_graph_into(attribute_proto.graphs.add(), graph) attribute_proto.type = onnx.AttributeProto.GRAPHS elif type_ == _enums.AttributeType.SPARSE_TENSOR: raise NotImplementedError( f"Sparse tensors are not supported yet. {_PLEASE_CONTRIBUTE}" ) elif type_ == _enums.AttributeType.SPARSE_TENSORS: raise NotImplementedError( f"Sparse tensors are not supported yet. {_PLEASE_CONTRIBUTE}" ) elif type_ == _enums.AttributeType.TYPE_PROTO: # value: _core.TypeAndShape if value.type is not None: serialize_type_into(attribute_proto.tp, value.type) # Need to create the type _before_ writing the shape if value.shape is not None: serialize_shape_into(attribute_proto.tp, value.shape) attribute_proto.type = onnx.AttributeProto.TYPE_PROTO elif type_ == _enums.AttributeType.TYPE_PROTOS: for ir_type in value: # ir_type: _core.TypeAndShape type_proto = attribute_proto.type_protos.add() if ir_type.type is not None: serialize_type_into(type_proto, ir_type.type) # Need to create the type _before_ writing the shape so that the shape can be written to the leaf type proto if ir_type.shape is not None: serialize_shape_into(type_proto, ir_type.shape) attribute_proto.type = onnx.AttributeProto.TYPE_PROTOS else: raise TypeError(f"Unsupported attribute type: {type_}") @_capture_errors(lambda attribute_proto, from_: repr(from_)) def serialize_reference_attribute_into( attribute_proto: onnx.AttributeProto, from_: _protocols.ReferenceAttributeProtocol ) -> None: attribute_proto.name = from_.name attribute_proto.ref_attr_name = from_.ref_attr_name if from_.doc_string: attribute_proto.doc_string = from_.doc_string attribute_proto.type = typing.cast(onnx.AttributeProto.AttributeType, from_.type.value) def serialize_value(value: _protocols.ValueProtocol, *, name: str = "") -> onnx.ValueInfoProto: """Serialize a value into a ValueInfoProto. Args: value: The proto to serialize into. from_: The value to serialize. name: A custom name to set for the value info. If not provided, the name from the value will be used. """ value_info_proto = onnx.ValueInfoProto() serialize_value_into(value_info_proto, value, name=name) return value_info_proto @_capture_errors(lambda value_info_proto, from_: repr(from_)) def serialize_value_into( value_info_proto: onnx.ValueInfoProto, from_: _protocols.ValueProtocol, *, name: str = "", ) -> None: """Serialize a value into a ValueInfoProto. Args: value_info_proto: The proto to serialize into. from_: The value to serialize. name: A custom name to set for the value info. If not provided, the name from the value will be used. """ if name: value_info_proto.name = name else: value_info_proto.name = from_.name if from_.metadata_props: _serialize_metadata_props_into(value_info_proto.metadata_props, from_.metadata_props) if from_.type is not None: serialize_type_into(value_info_proto.type, from_.type) # Need to create the type _before_ writing the shape so that the shape can be written to the leaf type proto if from_.shape is not None: serialize_shape_into(value_info_proto.type, from_.shape) if from_.doc_string: value_info_proto.doc_string = from_.doc_string @_capture_errors(lambda type_proto, from_: repr(from_)) def serialize_type_into(type_proto: onnx.TypeProto, from_: _protocols.TypeProtocol) -> None: if from_.denotation: type_proto.denotation = from_.denotation if isinstance(from_, _core.TensorType): tensor_type_proto = type_proto.tensor_type tensor_type_proto.elem_type = from_.dtype.value elif isinstance(from_, _core.SparseTensorType): sparse_tensor_type_proto = type_proto.sparse_tensor_type sparse_tensor_type_proto.elem_type = from_.dtype.value elif isinstance(from_, _core.SequenceType): sequence_type_proto = type_proto.sequence_type serialize_type_into(sequence_type_proto.elem_type, from_.elem_type) elif isinstance(from_, _core.OptionalType): optional_type_proto = type_proto.optional_type serialize_type_into(optional_type_proto.elem_type, from_.elem_type) else: raise TypeError(f"Unsupported type: {from_}") def serialize_type(type_protocol: _protocols.TypeProtocol) -> onnx.TypeProto: type_proto = onnx.TypeProto() serialize_type_into(type_proto, from_=type_protocol) return type_proto @_capture_errors(lambda type_proto, from_: repr(from_)) def serialize_shape_into(type_proto: onnx.TypeProto, from_: _protocols.ShapeProtocol) -> None: value_field = type_proto.WhichOneof("value") tensor_type = getattr(type_proto, value_field) while not isinstance(tensor_type.elem_type, int): # Find the leaf type that has the shape field type_proto = tensor_type.elem_type value_field = type_proto.WhichOneof("value") tensor_type = getattr(type_proto, value_field) # When from is empty, we still need to set the shape field to an empty list by touching it tensor_type.shape.ClearField("dim") for i, dim in enumerate(from_): denotation = from_.get_denotation(i) serialize_dimension_into(tensor_type.shape.dim.add(), dim, denotation) @_capture_errors(lambda dim_proto, dim, denotation: repr(dim_proto)) def serialize_dimension_into( dim_proto: onnx.TensorShapeProto.Dimension, dim: int | _protocols.SymbolicDimProtocol, denotation: str | None = None, ) -> None: if denotation: dim_proto.denotation = denotation if isinstance(dim, int): dim_proto.dim_value = dim elif isinstance(dim, (_core.SymbolicDim, _protocols.SymbolicDimProtocol)): if dim.value is not None: # TODO(justinchuby): None is probably not a valid value for dim_param dim_proto.dim_param = str(dim.value)