from typing import List, Optional, Sequence, Set, Union
from distutils.version import LooseVersion
import os
from codegen import local
from codegen.api.types import (
ArgName,
ArrayCType,
ArrayRefCType,
BaseCType,
BaseTypeToCppMapping,
Binding,
boolT,
ConstRefCType,
CType,
dimnameListT,
intArrayRefT,
iTensorListRefT,
ListCType,
longT,
MutRefCType,
NamedCType,
OptionalCType,
optionalIntArrayRefT,
optionalSymIntArrayRefT,
scalarT,
SpecialArgName,
symIntArrayRefT,
SymIntT,
tensorListT,
tensorOptionsT,
tensorT,
TupleCType,
VectorCType,
voidT,
)
from codegen.model import (
Argument,
Arguments,
BaseTy,
BaseType,
FunctionSchema,
ListType,
NativeFunction,
OptionalType,
Return,
SelfArgument,
TensorOptionsArguments,
Type,
)
from codegen.utils import assert_never
PYTORCH_VERSION = os.environ.get('PYTORCH_VERSION')
def name(
func: FunctionSchema,
*,
faithful_name_for_out_overloads: bool = False,
symint_overload: bool = False,
) -> str:
func_name = str(func.name.name)
if symint_overload:
func_name += "_symint"
if func.is_out_fn():
if faithful_name_for_out_overloads:
func_name += "_outf"
else:
func_name += "_out"
return func_name
def valuetype_type(
t: Type,
*,
binds: ArgName,
remove_non_owning_ref_types: bool = False,
symint: bool = False,
) -> Optional[NamedCType]:
if isinstance(t, BaseType):
if t.name == BaseTy.Tensor or t.name == BaseTy.Scalar:
return None
elif str(t) == "SymInt":
if symint:
return NamedCType(binds, BaseCType(SymIntT))
else:
return NamedCType(binds, BaseCType(longT))
if remove_non_owning_ref_types:
if t.name == BaseTy.str:
raise AssertionError(
"string ref->value conversion: not implemented yet"
)
return NamedCType(binds, BaseCType(BaseTypeToCppMapping[t.name]))
elif isinstance(t, OptionalType):
elem = valuetype_type(t.elem, binds=binds, symint=symint)
if elem is None:
return None
return NamedCType(binds, OptionalCType(elem.type))
elif isinstance(t, ListType):
if str(t.elem) == "bool":
if t.size is None:
raise ValueError("t.size is None")
return NamedCType(binds, ArrayCType(BaseCType(boolT), t.size))
else:
return None
else:
raise AssertionError(f"unrecognized type {repr(t)}")
def argumenttype_type(
t: Type,
*,
mutable: bool,
binds: ArgName,
remove_non_owning_ref_types: bool = False,
symint: bool = False,
) -> NamedCType:
r = valuetype_type(
t,
binds=binds,
symint=symint,
remove_non_owning_ref_types=remove_non_owning_ref_types,
)
if r is not None:
return r
if isinstance(t, BaseType):
if t.name == BaseTy.Tensor:
if mutable and not local.use_const_ref_for_mutable_tensors():
return NamedCType(binds, MutRefCType(BaseCType(tensorT)))
else:
return NamedCType(binds, ConstRefCType(BaseCType(tensorT)))
elif t.name == BaseTy.Scalar:
return NamedCType(binds, ConstRefCType(BaseCType(scalarT)))
else:
raise AssertionError(f"base type should have been value type {t}")
elif isinstance(t, OptionalType):
if str(t.elem) == "Tensor":
if mutable and not local.use_const_ref_for_mutable_tensors():
return NamedCType(
binds, MutRefCType(BaseCType(tensorT))
)
else:
return NamedCType(
binds, ConstRefCType(OptionalCType(BaseCType(tensorT)))
)
elif str(t.elem) == "Scalar":
return NamedCType(binds, ConstRefCType(OptionalCType(BaseCType(scalarT))))
elif LooseVersion(PYTORCH_VERSION) >= LooseVersion('2.0'):
if isinstance(t.elem, ListType) and str(t.elem.elem) == "int":
return NamedCType(binds, BaseCType(optionalIntArrayRefT))
elif isinstance(t.elem, ListType) and str(t.elem.elem) == "SymInt":
if symint:
return NamedCType(binds, BaseCType(optionalSymIntArrayRefT))
else:
return NamedCType(binds, BaseCType(optionalIntArrayRefT))
elem = argumenttype_type(t.elem, mutable=mutable, binds=binds, symint=symint)
return NamedCType(binds, OptionalCType(elem.type))
elif isinstance(t, ListType):
if str(t.elem) == "int":
if remove_non_owning_ref_types:
return NamedCType(binds, VectorCType(BaseCType(longT)))
else:
return NamedCType(binds, BaseCType(intArrayRefT))
if str(t.elem) == "SymInt":
if remove_non_owning_ref_types:
if symint:
return NamedCType(binds, VectorCType(BaseCType(SymIntT)))
else:
return NamedCType(binds, VectorCType(BaseCType(longT)))
else:
if symint:
return NamedCType(binds, BaseCType(symIntArrayRefT))
else:
return NamedCType(binds, BaseCType(intArrayRefT))
if str(t.elem) == "Tensor":
if LooseVersion(PYTORCH_VERSION) < LooseVersion('2.0'):
return NamedCType(binds, BaseCType(tensorListT))
if local.use_ilistref_for_tensor_lists():
return NamedCType(binds, ConstRefCType(BaseCType(iTensorListRefT)))
else:
return NamedCType(binds, BaseCType(tensorListT))
elif str(t.elem) == "Scalar":
return NamedCType(binds, ArrayRefCType(BaseCType(scalarT)))
elif str(t.elem) == "Dimname":
return NamedCType(binds, BaseCType(dimnameListT))
elif str(t.elem) == "Tensor?":
return NamedCType(
binds, ConstRefCType(ListCType(OptionalCType(BaseCType(tensorT))))
)
elem = argumenttype_type(t.elem, mutable=mutable, binds=binds, symint=symint)
return NamedCType(binds, ArrayRefCType(elem.type))
else:
raise AssertionError(f"unrecognized type {repr(t)}")
def argument_type(a: Argument, *, binds: ArgName, symint: bool = False) -> NamedCType:
return argumenttype_type(a.type, mutable=a.is_write, symint=symint, binds=binds)
def returntype_type(t: Type, *, mutable: bool, symint: bool = False) -> CType:
r = valuetype_type(t, binds="__placeholder__", symint=True)
if r is not None:
return r.type
if isinstance(t, BaseType):
if t.name == BaseTy.Tensor:
if mutable:
if local.use_const_ref_for_mutable_tensors():
return ConstRefCType(BaseCType(tensorT))
else:
return MutRefCType(BaseCType(tensorT))
else:
return BaseCType(tensorT)
elif t.name == BaseTy.Scalar:
return BaseCType(scalarT)
elif isinstance(t, ListType):
if mutable:
raise ValueError("Native functions should never return a mutable tensor list. They should return void.")
elem = returntype_type(t.elem, mutable=False)
if t.size is not None:
raise ValueError(f"fixed size list returns not supported: {t}")
return VectorCType(elem)
raise AssertionError(f"unrecognized return type {t}")
def return_type(r: Return, *, symint: bool = False) -> CType:
return returntype_type(r.type, mutable=r.is_write, symint=symint)
def returns_type(rs: Sequence[Return], *, symint: bool = False) -> CType:
if len(rs) == 0:
return BaseCType(voidT)
elif len(rs) == 1:
return return_type(rs[0], symint=symint)
else:
return TupleCType([return_type(r, symint=symint) for r in rs])
def return_names(f: NativeFunction, *, fallback_name: str = "result") -> Sequence[str]:
returns: List[str] = []
for i, r in enumerate(f.func.returns):
if f.func.name.name.inplace:
if i != 0:
raise ValueError("illegal inplace function with multiple returns")
return_name = "self"
elif f.func.is_out_fn():
return_name = f.func.arguments.out[i].name
elif r.name:
name_conflict = any(
r.name == a.name for a in f.func.schema_order_arguments()
)
if name_conflict and not f.func.is_out_fn():
return_name = f"{r.name}_return"
else:
return_name = r.name
else:
return_name = fallback_name if len(f.func.returns) == 1 else f"{fallback_name}{i}"
returns.append(return_name)
return returns
JIT_TO_CPP_DEFAULT = {
"False": "false",
"True": "true",
"None": "c10::nullopt",
"Mean": "at::Reduction::Mean",
"[]": "{}",
"contiguous_format": "at::MemoryFormat::Contiguous",
"long": "at::kLong",
}
def default_expr(d: str, t: Type, *, symint: bool) -> str:
if d == "None" and str(t) == "Tensor?":
return "{}"
if isinstance(t, BaseType) and t.name is BaseTy.str:
if len(d) >= 2 and d[0] == "'" and d[-1] == "'":
s = ""
i = 1
while i + 1 < len(d):
if d[i] != "\\":
if d[i] == '"':
s += '\\"'
else:
s += d[i]
i += 1
else:
if d[i + 1] == "'":
s += "'"
else:
s += d[i : i + 2]
i += 2
return f'"{s}"'
if isinstance(t, OptionalType):
if d == "None":
return "c10::nullopt"
return default_expr(d, t.elem, symint=symint)
if isinstance(t, ListType):
if d.startswith("[") and d.endswith("]"):
return "{" + d[1:-1] + "}"
elif symint and d.isdigit() and str(t.elem) == "SymInt":
return f"c10::SymInt({d})"
elif t.size is None:
raise ValueError(f"Expected a list default '[...]' but found: '{d}'")
return JIT_TO_CPP_DEFAULT.get(d, d)
def argument(
a: Union[Argument, TensorOptionsArguments, SelfArgument],
*,
cpp_no_default_args: Set[str],
method: bool,
faithful: bool,
symint: bool = False,
has_tensor_options: bool,
) -> List[Binding]:
def sub_argument(
a: Union[Argument, TensorOptionsArguments, SelfArgument]
) -> List[Binding]:
return argument(
a,
cpp_no_default_args=cpp_no_default_args,
method=method,
faithful=faithful,
symint=symint,
has_tensor_options=has_tensor_options,
)
if isinstance(a, Argument):
binds: ArgName
if a.name == "memory_format" and has_tensor_options:
binds = SpecialArgName.possibly_redundant_memory_format
else:
binds = a.name
default: Optional[str] = None
if a.name not in cpp_no_default_args and a.default is not None:
default = default_expr(a.default, a.type, symint=symint)
return [
Binding(
nctype=argument_type(a, binds=binds, symint=symint),
name=a.name,
default=default,
argument=a,
)
]
elif isinstance(a, TensorOptionsArguments):
if faithful:
return (
sub_argument(a.dtype)
+ sub_argument(a.layout)
+ sub_argument(a.device)
+ sub_argument(a.pin_memory)
)
else:
default = None
if "options" in cpp_no_default_args:
raise ValueError("options in cpp_no_default_args")
if all(x.default == "None" for x in a.all()):
default = "{}"
elif a.dtype.default == "long":
default = "at::kLong"
return [
Binding(
nctype=NamedCType("options", BaseCType(tensorOptionsT)),
name="options",
default=default,
argument=a,
)
]
elif isinstance(a, SelfArgument):
if method:
return []
else:
return sub_argument(a.argument)
else:
assert_never(a)
def arguments(
param_arguments: Arguments,
*,
faithful: bool,
symint: bool = False,
method: bool,
cpp_no_default_args: Set[str],
) -> List[Binding]:
args: List[Union[Argument, TensorOptionsArguments, SelfArgument]] = []
if faithful:
args.extend(param_arguments.non_out)
args.extend(param_arguments.out)
else:
args.extend(param_arguments.out)
args.extend(param_arguments.non_out)
res = []
for a in args:
for r in argument(
a,
faithful=faithful,
symint=symint,
method=method,
has_tensor_options=param_arguments.tensor_options is not None,
cpp_no_default_args=cpp_no_default_args,
):
res.append(r.no_default() if faithful else r)
return res