from collections import namedtuple
import itertools
import os
import re
import torch._C as C
from torch.testing._internal.common_utils import TestCase, run_tests
from torch._python_dispatcher import PythonDispatcher
import torch.utils.cpp_extension
import torch_npu
import torch_npu.testing
Result = namedtuple('Result', 'state table provenance')
dispatch_keys_to_check = (
'Undefined',
'CPU',
'CUDA',
'XLA',
'AutogradOther',
'AutogradCPU',
'AutogradCUDA',
'AutogradXLA')
def extract_dispatch_table_with_keys(table, dispatch_keys):
extracted = ''
table_entries = table.split('\n')
regex = re.compile(r"registered at .*FallbackKernel\.cpp.*(\[)")
for k in dispatch_keys:
for t in table_entries:
if t.startswith(k):
entry = regex.sub('registered in pytorch framework [', t)
extracted += (entry + '\n')
return extracted
class TestDispatch(TestCase):
namespace_index = 0
def test_all_invariants(self):
C._dispatch_check_all_invariants()
def run_ops(self, name, ops, ctor_order=None, dtor_order=None,
results=None, expect_raises=False):
"""
Given a list of operator registrations, run the registrations in the
order specified by ctor_order, and then run the deregistrations in
dtor_order.
If results is specified, intermediate results are checked for consistency
with results stored in results (and stored in results if this is the
first time we've seen them). Results are expected to be equivalent
modulo commutativity and inverses (thus, results is keyed on a frozenset
of in effect registrations from ops). Results stores namedtuple
Result[state, table, provenance], where state is a string that contains
non-derived kernel registered or error message if it doesn't pass;
table is a string that contains computed dispatch table entries;
provenance is a string that describes how exactly we got this string.
If expect_raises is True, it is not an error to raise an exception. Instead,
we'll store the exception string (instead of the dispatcher state)
in results. In principle we should flag these differently, but it's
very obvious when you get an error in one case but not another.
"""
self.__class__.namespace_index += 1
if results is None:
results = {}
if ctor_order is None:
ctor_order = list(range(len(ops)))
if dtor_order is None:
dtor_order = list(reversed(ctor_order))
refs = [None] * len(ops)
active_ops = set()
test_namespace = f"__test{self.namespace_index}__"
def check_invariants(actual_provenance):
C._dispatch_check_invariants(name)
actual_state = C._dispatch_dump(
f"{test_namespace}::{name}").replace(test_namespace, "test")
actual_table = C._dispatch_dump_table(
f"{test_namespace}::{name}").replace(test_namespace, "test")
expected_state, expected_table, expected_provenance = results.setdefault(
frozenset(active_ops),
Result(actual_state, actual_table, actual_provenance)
)
self.assertMultiLineEqual(
expected_state, actual_state,
f"expected from {expected_provenance}; actual from {actual_provenance}"
)
self.assertMultiLineEqual(
expected_table, actual_table,
f"expected from {expected_provenance}; actual from {actual_provenance}"
)
results.setdefault(frozenset(), Result("", "", "hardcoded initial state"))
check_invariants("initial state")
set_to_report = frozenset(range(len(ops)))
for i, op_ix in enumerate(ctor_order):
refs[op_ix] = C._dispatch_library("FRAGMENT", test_namespace, "")
active_ops.add(op_ix)
try:
ops[op_ix](refs[op_ix])
check_invariants(f"running ctors {ctor_order[:i + 1]}")
except RuntimeError as e:
if not expect_raises:
raise
actual = str(e).replace(test_namespace, "test")
actual = actual.split("\nException raised from ")[0]
expected, _, expected_provenance = results.setdefault(
frozenset(active_ops),
Result(actual, "", f"error after running ctors {ctor_order[:i + 1]}")
)
self.assertMultiLineEqual(expected, actual, expected_provenance)
set_to_report = frozenset(active_ops)
active_ops.remove(op_ix)
check_invariants(
f"running ctors {ctor_order[:i]} and then failing to run ctor {op_ix} "
"(did this failure leave the dispatcher in a wedged state? "
"it shouldn't!)"
)
break
last_ctor = i
if expect_raises and len(active_ops) == len(ops):
refs = None
self.assertTrue(
False,
"expected exception to be raised, but nothing was raised "
f"(after running ctors {ctor_order})")
for i, op_ix in enumerate(dtor_order):
refs[op_ix] = None
if expect_raises:
active_ops.discard(op_ix)
else:
active_ops.remove(op_ix)
check_invariants(
f"running ctors {ctor_order[:last_ctor + 1]}, then running dtors {dtor_order[:i + 1]}"
)
return results[set_to_report][0]
def commute(self, name, ops, ctor_order=None, expect_raises=False):
results = {}
def go(ctor_order):
for dtor_order in itertools.permutations(range(len(ops))):
self.run_ops(
name, ops, ctor_order, dtor_order,
results=results, expect_raises=expect_raises)
if ctor_order is not None:
go(ctor_order)
else:
for ctor_order in itertools.permutations(range(len(ops))):
go(ctor_order)
return results[frozenset(range(len(ops)))]
def test_def(self):
state = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo"),
lambda m: m.impl_t_t("foo", dispatch="CPU"),
lambda m: m.impl_t_t("foo", dispatch="Autograd"),
lambda m: m.impl_t_t("foo", dispatch="AutogradCPU")
]).state
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
AutogradCPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
def test_def_impl_schema_mismatch(self):
state = self.commute("foo", [
lambda m: m.def_("foo(Tensor x, Tensor y) -> Tensor"),
lambda m: m.impl_t_t("foo"),
], expect_raises=True).state
self.assertExpectedInline(state, '''\
Inferred operator schema for a C++ kernel function doesn't match the expected function schema.
operator: test::foo
expected schema: test::foo(Tensor x, Tensor y) -> Tensor
registered at /dev/null:0
inferred schema: (Tensor _0) -> Tensor _0
impl_t_t
reason: The number of arguments is different. 2 vs 1.''')
def test_def_with_inference(self):
state = self.commute("foo", [
lambda m: m.def_name_t_t("foo"),
lambda m: m.impl_t_t("foo", "CPU"),
lambda m: m.impl_t_t("foo", "Autograd"),
lambda m: m.impl_t_t("foo", "AutogradCPU")
]).state
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor _0) -> Tensor _0
debug: registered at /dev/null:0
alias analysis kind: CONSERVATIVE
CPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
AutogradCPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: default_def_name_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
def test_def_only(self):
state = self.commute("foo", [
lambda m: m.def_("foo(Tensor x, Tensor y) -> Tensor"),
]).state
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x, Tensor y) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
''')
def test_impl_only(self):
state = self.commute("foo", [
lambda m: m.impl_t_t("foo"),
lambda m: m.impl_t_t("foo", "CPU"),
lambda m: m.impl_t_t("foo", "Autograd"),
lambda m: m.impl_t_t("foo", "AutogradCPU")
]).state
self.assertExpectedInline(state, '''\
name: test::foo
schema: (none)
CPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
AutogradCPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
def test_computed_table(self):
result = self.commute("foo", [
lambda m: m.def_name_t_t("foo"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "XLA", debug="fn_xla"),
lambda m: m.impl_t_t("foo", "Autograd", debug="fn_autograd"),
lambda m: m.impl_t_t("foo", "AutogradCPU", debug="fn_autogradcpu")
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor _0) -> Tensor _0
debug: registered at /dev/null:0
alias analysis kind: CONSERVATIVE
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
XLA: fn_xla :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
AutogradCPU: fn_autogradcpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: fn_autograd :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: default_def_name_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: default_def_name_t_t [math kernel]
CPU: fn_cpu [kernel]
CUDA: default_def_name_t_t [math kernel]
XLA: fn_xla [kernel]
AutogradOther: default_def_name_t_t [math kernel]
AutogradCPU: fn_autogradcpu [kernel]
AutogradCUDA: default_def_name_t_t [math kernel]
AutogradXLA: fn_autograd [autograd kernel]
''')
def test_computed_table_with_cpu_math_autogradcpu_fallthrough(self):
global_m = C._dispatch_library("IMPL", "_", "AutogradCPU")
result = self.commute("foo", [
lambda m: m.def_name_t_t("foo"),
lambda m: m.impl_t_t("foo", "CPU"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor _0) -> Tensor _0
debug: registered at /dev/null:0
alias analysis kind: CONSERVATIVE
CPU: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: default_def_name_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: default_def_name_t_t [math kernel]
CPU: impl_t_t [kernel]
CUDA: default_def_name_t_t [math kernel]
XLA: default_def_name_t_t [math kernel]
AutogradOther: default_def_name_t_t [math kernel]
AutogradCPU: registered in pytorch framework [backend fallback]
AutogradCUDA: default_def_name_t_t [math kernel]
AutogradXLA: default_def_name_t_t [math kernel]
''')
def test_computed_table_with_math(self):
global_m = C._dispatch_library("IMPL", "_", "AutogradCPU")
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CompositeImplicitAutograd"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CompositeImplicitAutograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: impl_t_t [math kernel]
CPU: impl_t_t [math kernel]
CUDA: impl_t_t [math kernel]
XLA: impl_t_t [math kernel]
AutogradOther: impl_t_t [math kernel]
AutogradCPU: impl_t_t [math kernel]
AutogradCUDA: impl_t_t [math kernel]
AutogradXLA: impl_t_t [math kernel]
''')
def test_computed_table_with_cpu_math(self):
global_m = C._dispatch_library("IMPL", "_", "AutogradCPU")
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "CompositeImplicitAutograd", debug="fn_math"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: fn_math :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_math [math kernel]
CPU: fn_cpu [kernel]
CUDA: fn_math [math kernel]
XLA: fn_math [math kernel]
AutogradOther: fn_math [math kernel]
AutogradCPU: registered in pytorch framework [backend fallback]
AutogradCUDA: fn_math [math kernel]
AutogradXLA: fn_math [math kernel]
''')
def test_computed_table_with_autograd(self):
global_m = C._dispatch_library("IMPL", "_", "AutogradCPU")
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "Autograd"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
Autograd[alias]: impl_t_t :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
AutogradOther: impl_t_t [autograd kernel]
AutogradCPU: impl_t_t [autograd kernel]
AutogradCUDA: impl_t_t [autograd kernel]
AutogradXLA: impl_t_t [autograd kernel]
''')
def test_computed_table_with_cpu_autograd_math(self):
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "Autograd", debug="fn_autograd"),
lambda m: m.impl_t_t("foo", "CompositeImplicitAutograd", debug="fn_math"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: fn_autograd :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: fn_math :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_math [math kernel]
CPU: fn_cpu [kernel]
CUDA: fn_math [math kernel]
XLA: fn_math [math kernel]
AutogradOther: fn_math [math kernel]
AutogradCPU: fn_autograd [autograd kernel]
AutogradCUDA: fn_math [math kernel]
AutogradXLA: fn_math [math kernel]
''')
def test_computed_table_with_ambiguous_autogradother(self):
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CompositeImplicitAutograd", debug="fn_math"),
lambda m: m.impl_t_t("foo", "FPGA", debug="fn_fpga"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
FPGA: fn_fpga :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: fn_math :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check + ('FPGA',))
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_math [math kernel]
CPU: fn_math [math kernel]
CUDA: fn_math [math kernel]
XLA: fn_math [math kernel]
AutogradOther: ambiguous_autogradother [ambiguous autogradother]
AutogradCPU: fn_math [math kernel]
AutogradCUDA: fn_math [math kernel]
AutogradXLA: fn_math [math kernel]
FPGA: fn_fpga [kernel]
''')
def test_computed_table_with_cpu_defaultbackend(self):
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "CompositeExplicitAutograd", debug="fn_defaultbackend"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeExplicitAutograd[alias]: fn_defaultbackend :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_defaultbackend [default backend kernel]
CPU: fn_cpu [kernel]
CUDA: fn_defaultbackend [default backend kernel]
XLA: fn_defaultbackend [default backend kernel]
AutogradOther: registered in pytorch framework [backend fallback]
AutogradCPU: registered in pytorch framework [backend fallback]
AutogradCUDA: registered in pytorch framework [backend fallback]
AutogradXLA: registered in pytorch framework [backend fallback]
''')
def test_computed_table_with_cpu_autograd_defaultbackend(self):
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "Autograd", debug="fn_autograd"),
lambda m: m.impl_t_t("foo", "CompositeExplicitAutograd", debug="fn_defaultbackend"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: fn_autograd :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeExplicitAutograd[alias]: fn_defaultbackend :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check + ('FPGA',))
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_defaultbackend [default backend kernel]
CPU: fn_cpu [kernel]
CUDA: fn_defaultbackend [default backend kernel]
XLA: fn_defaultbackend [default backend kernel]
AutogradOther: fn_autograd [autograd kernel]
AutogradCPU: fn_autograd [autograd kernel]
AutogradCUDA: fn_autograd [autograd kernel]
AutogradXLA: fn_autograd [autograd kernel]
FPGA: fn_defaultbackend [default backend kernel]
''')
def test_computed_table_with_cpu_autograd_math_defaultbackend(self):
result = self.commute("foo", [
lambda m: m.def_("foo(Tensor x) -> Tensor"),
lambda m: m.impl_t_t("foo", "CPU", debug="fn_cpu"),
lambda m: m.impl_t_t("foo", "Autograd", debug="fn_autograd"),
lambda m: m.impl_t_t("foo", "CompositeImplicitAutograd", debug="fn_math"),
lambda m: m.impl_t_t("foo", "CompositeExplicitAutograd", debug="fn_defaultbackend"),
])
state, table = result.state, result.table
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: FROM_SCHEMA
CPU: fn_cpu :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
Autograd[alias]: fn_autograd :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias]: fn_math :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeExplicitAutograd[alias]: fn_defaultbackend :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
''')
extracted_table = extract_dispatch_table_with_keys(table, dispatch_keys_to_check)
self.assertExpectedInline(extracted_table, '''\
Undefined: fn_defaultbackend [default backend kernel]
CPU: fn_cpu [kernel]
CUDA: fn_defaultbackend [default backend kernel]
XLA: fn_defaultbackend [default backend kernel]
AutogradOther: fn_autograd [autograd kernel]
AutogradCPU: fn_autograd [autograd kernel]
AutogradCUDA: fn_autograd [autograd kernel]
AutogradXLA: fn_autograd [autograd kernel]
''')
def test_multiple_def_error(self):
ops = [
lambda m: m.def_("foo(Tensor x, Tensor y) -> Tensor"),
lambda m: m.def_("foo(Tensor x, Tensor y) -> Tensor"),
]
self.assertExpectedInline(
self.commute("foo", ops, expect_raises=True).state,
'''Tried to register an operator (test::foo(Tensor x, Tensor y) -> Tensor) with the same name and overload '''
'''name multiple times. Each overload's schema should only be registered with a single call to def(). '''
'''Duplicate registration: registered at /dev/null:0. Original registration: registered at /dev/null:0'''
)
def test_def_with_explicit_alias(self):
state = self.commute("foo", [
lambda m: m.def_("foo(Tensor x, Tensor y) -> Tensor",
alias="PURE_FUNCTION")
]).state
self.assertExpectedInline(state, '''\
name: test::foo
schema: test::foo(Tensor x, Tensor y) -> Tensor
debug: registered at /dev/null:0
alias analysis kind: PURE_FUNCTION
''')
def test_multiple_def_alias_defaulting(self):
ops = [
lambda m: m.def_("foo(Tensor x) -> Tensor", alias="PURE_FUNCTION"),
lambda m: m.def_legacy("foo(Tensor x) -> Tensor"),
]
self.assertExpectedInline(
self.commute("foo", ops, expect_raises=True).state,
'''Tried to register an operator (test::foo(Tensor x) -> Tensor) with the same name and overload '''
'''name multiple times. Each overload's schema should only be registered with a single call to def(). '''
'''Duplicate registration: registered at /dev/null:0. Original registration: registered at /dev/null:0'''
)
def test_multiple_def_alias_mismatch(self):
ops = [
lambda m: m.def_("foo(Tensor x) -> Tensor", alias="PURE_FUNCTION"),
lambda m: m.def_("foo(Tensor x) -> Tensor", alias="CONSERVATIVE"),
]
self.assertExpectedInline(
self.commute("foo", ops, expect_raises=True).state,
'''Tried to register an operator (test::foo(Tensor x) -> Tensor) with the same name and overload '''
'''name multiple times. Each overload's schema should only be registered with a single call to def(). '''
'''Duplicate registration: registered at /dev/null:0. Original registration: registered at /dev/null:0'''
)
def test_multiple_fallback(self):
global_m = C._dispatch_library("IMPL", "_", "XLA")
global_m.fallback_fallthrough(),
try:
global_m.fallback_fallthrough(),
except RuntimeError as e:
self.assertExpectedInline(
str(e),
'''Tried to register multiple backend fallbacks for the same dispatch key XLA; previous registration '''
'''registered at /dev/null:0, new registration registered at /dev/null:0'''
)
else:
self.assertTrue(False)
def test_overwrite_math(self):
ops = [
lambda m: m.impl_t_t("foo", debug="fn1"),
lambda m: m.impl_t_t("foo", debug="fn2"),
]
self.assertExpectedInline(
self.commute("foo", ops, ctor_order=(0, 1)).state,
'''\
name: test::foo
schema: (none)
CompositeImplicitAutograd[alias]: fn2 :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
CompositeImplicitAutograd[alias] (inactive): fn1 :: (Tensor _0) -> Tensor _0 [ boxed unboxed ]
'''
)
def test_find_dangling_impls(self):
dangling_impls = C._dispatch_find_dangling_impls()
self.assertEqual(
0,
len(dangling_impls),
msg=f"Expect zero dangling impls, but found: {dangling_impls}"
)
def test_find_dangling_impls_ext(self):
extension_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cpp_extensions', 'dangling_impl_extension.cpp')
module = torch.utils.cpp_extension.load(
name="dangling_impl_extension",
sources=[
extension_path,
],
extra_cflags=["-g"],
verbose=True,
)
impls = C._dispatch_find_dangling_impls()
self.assertEqual(1, len(impls))
self.assertEqual(
f'''\
name: __test::foo
schema: (none)
CPU: registered at {extension_path}:5 :: () -> () [ boxed unboxed ]
''',
impls[0])
def test_dispatch_print_registrations_for_dispatch_key_invalid(self):
with self.assertRaisesRegex(
RuntimeError,
"could not parse dispatch key: invalid_key"):
C._dispatch_print_registrations_for_dispatch_key('invalid_key')
class TestPythonDispatcher(TestCase):
def test_basic(self):
dispatcher = PythonDispatcher()
dispatcher.register(["CPU", "XLA", "Lazy", "CompositeImplicitAutograd"])
self.assertExpectedInline(
dispatcher.dispatchTable(),
'''\
Computed Dispatch Table
key kernel
---------------------------
CPU fn_CPU [kernel]
XLA fn_XLA [kernel]
Lazy fn_Lazy [kernel]
FPGA fn_CompositeImplicitAutograd [math kernel]
AutogradOther fn_CompositeImplicitAutograd [math kernel]
AutogradCPU [backend fallback]
AutogradXLA [backend fallback]
AutogradLazy [backend fallback]
'''
)
def test_math_autogradcpu(self):
dispatcher = PythonDispatcher()
dispatcher.register(["CPU", "XLA", "Lazy", "CompositeImplicitAutograd", "AutogradCPU"])
self.assertExpectedInline(
dispatcher.dispatchTable(),
'''\
Computed Dispatch Table
key kernel
---------------------------
CPU fn_CPU [kernel]
XLA fn_XLA [kernel]
Lazy fn_Lazy [kernel]
FPGA fn_CompositeImplicitAutograd [math kernel]
AutogradOther fn_CompositeImplicitAutograd [math kernel]
AutogradCPU fn_AutogradCPU [kernel]
AutogradXLA [backend fallback]
AutogradLazy [backend fallback]
'''
)
self.assertExpectedInline(
dispatcher.registrations(),
'''\
Registered Kernels
key kernel
---------------------------
CPU fn_CPU
XLA fn_XLA
Lazy fn_Lazy
AutogradCPU fn_AutogradCPU
CompositeImplicitAutograd[alias] fn_CompositeImplicitAutograd
'''
)
def test_defaultbackend_autogradcpu(self):
dispatcher = PythonDispatcher()
dispatcher.register(["CPU", "XLA", "Lazy", "CompositeExplicitAutograd", "AutogradCPU"])
self.assertExpectedInline(
dispatcher.dispatchTable(),
'''\
Computed Dispatch Table
key kernel
---------------------------
CPU fn_CPU [kernel]
XLA fn_XLA [kernel]
Lazy fn_Lazy [kernel]
FPGA fn_CompositeExplicitAutograd [default backend kernel]
AutogradOther [backend fallback]
AutogradCPU fn_AutogradCPU [kernel]
AutogradXLA [backend fallback]
AutogradLazy [backend fallback]
'''
)
self.assertExpectedInline(
dispatcher.registrations(),
'''\
Registered Kernels
key kernel
---------------------------
CPU fn_CPU
XLA fn_XLA
Lazy fn_Lazy
AutogradCPU fn_AutogradCPU
CompositeExplicitAutograd[alias] fn_CompositeExplicitAutograd
'''
)
def test_autogradother(self):
dispatcher = PythonDispatcher()
dispatcher.register(["CPU", "FPGA", "CompositeImplicitAutograd"])
self.assertExpectedInline(
dispatcher.dispatchTable(),
'''\
Computed Dispatch Table
key kernel
---------------------------
CPU fn_CPU [kernel]
XLA fn_CompositeImplicitAutograd [math kernel]
Lazy fn_CompositeImplicitAutograd [math kernel]
FPGA fn_FPGA [kernel]
AutogradOther ambiguous_autogradother [ambiguous autogradother]
AutogradCPU [backend fallback]
AutogradXLA fn_CompositeImplicitAutograd [math kernel]
AutogradLazy fn_CompositeImplicitAutograd [math kernel]
'''
)
self.assertExpectedInline(
dispatcher.registrations(),
'''\
Registered Kernels
key kernel
---------------------------
FPGA fn_FPGA
CPU fn_CPU
CompositeImplicitAutograd[alias] fn_CompositeImplicitAutograd
'''
)
def test_duplicate_registrations(self):
dispatcher = PythonDispatcher()
with self.assertRaisesRegex(RuntimeError, r"Overriden is not allowed"):
dispatcher.register(["CPU", "CPU"])
def test_defaultbackend_math(self):
dispatcher = PythonDispatcher()
with self.assertRaisesRegex(
RuntimeError,
r"Registration to both CompositeImplicitAutograd and CompositeExplicitAutograd is not allowed"):
dispatcher.register(["CompositeExplicitAutograd", "CompositeImplicitAutograd"])
def test_quantized_structured_not_implemented(self):
x = torch.zeros([1, 1, 1])
y = torch.zeros([1, 1, 1])
scale, zero_point = 1.0, 0
dtype = torch.qint8
qx = torch.quantize_per_tensor(x, scale, zero_point, dtype)
qy = torch.quantize_per_tensor(y, scale, zero_point, dtype)
self.assertRaisesRegex(
NotImplementedError,
"Could not run 'aten::bmm.out' with arguments from the 'QuantizedCPU' backend.",
lambda: torch.bmm(qx, qy)
)
if __name__ == '__main__':
run_tests()