from __future__ import annotations
import re
from typing import Any
try:
from .validation_model import (
SCHEMA_ALLOWED_KEYS,
SUPPORTED_SCHEMA_TYPES,
SUPPORTED_STRING_FORMATS,
ValidationResult,
is_integer_value,
is_number_value,
json_pointer,
)
except ImportError:
from validation_model import (
SCHEMA_ALLOWED_KEYS,
SUPPORTED_SCHEMA_TYPES,
SUPPORTED_STRING_FORMATS,
ValidationResult,
is_integer_value,
is_number_value,
json_pointer,
)
def validate_schema_node(
schema: Any,
path: str,
result: ValidationResult,
*,
skip_paths: set[str] | None = None,
restrict_array_items_simple: bool = False,
) -> None:
if not isinstance(schema, dict):
result.add_issue("SCH001", path, "schema node must be an object")
return
schema_type = _validate_schema_type(schema, path, result)
if schema_type is None:
return
_validate_schema_keywords(schema, schema_type, path, result)
_validate_description(schema, path, result)
_validate_default(schema, schema_type, path, result)
_validate_enum(schema, schema_type, path, result)
_validate_type_specific_rules(
schema,
schema_type,
path,
result,
skip_paths or set(),
restrict_array_items_simple,
)
def value_matches_schema_type(value: Any, schema_type: str) -> bool:
if schema_type == "string":
return isinstance(value, str)
if schema_type == "number":
return is_number_value(value)
if schema_type == "integer":
return is_integer_value(value)
if schema_type == "boolean":
return isinstance(value, bool)
if schema_type == "array":
return isinstance(value, list)
if schema_type == "object":
return isinstance(value, dict)
return False
def validate_input_schema(schema: Any, path: str, result: ValidationResult) -> None:
validate_schema_node(schema, path, result, restrict_array_items_simple=True)
if not isinstance(schema, dict):
return
if schema.get("type") != "object":
result.add_issue("SCH_INPUT_002", json_pointer(path, "type"), "inputSchema root type must be object")
properties = schema.get("properties")
if not isinstance(properties, dict):
result.add_issue("SCH_INPUT_003", json_pointer(path, "properties"), "inputSchema properties must be object")
return
for key, subschema in properties.items():
if isinstance(subschema, dict) and subschema.get("type") == "object":
prop_path = json_pointer(json_pointer(path, "properties"), key)
result.add_issue(
"SCH_INPUT_004",
json_pointer(prop_path, "type"),
"inputSchema parameter does not support object type",
)
def _validate_schema_type(schema: dict[str, Any], path: str, result: ValidationResult) -> str | None:
if "type" not in schema:
result.add_issue("SCH002", path, "schema node must define type")
return None
schema_type = schema.get("type")
if schema_type not in SUPPORTED_SCHEMA_TYPES:
result.add_issue("SCH003", json_pointer(path, "type"), "unsupported schema type")
return None
return schema_type
def _validate_schema_keywords(
schema: dict[str, Any],
schema_type: str,
path: str,
result: ValidationResult,
) -> None:
allowed_keys = SCHEMA_ALLOWED_KEYS[schema_type]
for key in schema:
if key in allowed_keys:
continue
issue_code = "SCH401" if schema_type == "boolean" else "SCH004"
result.add_issue(issue_code, json_pointer(path, key), f"unsupported schema keyword for {schema_type}")
def _validate_description(schema: dict[str, Any], path: str, result: ValidationResult) -> None:
if "description" in schema and not isinstance(schema.get("description"), str):
result.add_issue("SCH005", json_pointer(path, "description"), "description must be string")
def _validate_default(
schema: dict[str, Any],
schema_type: str,
path: str,
result: ValidationResult,
) -> None:
if "default" not in schema:
return
default_value = schema.get("default")
if schema_type == "boolean":
if not isinstance(default_value, bool):
result.add_issue("SCH402", json_pointer(path, "default"), "default must be boolean")
return
if schema_type == "array":
if not isinstance(default_value, list):
result.add_issue("SCH506", json_pointer(path, "default"), "default must be array")
return
if not value_matches_schema_type(default_value, schema_type):
result.add_issue("SCH006", json_pointer(path, "default"), "default type does not match schema type")
def _validate_enum(
schema: dict[str, Any],
schema_type: str,
path: str,
result: ValidationResult,
) -> None:
if "enum" not in schema:
return
enum_value = schema.get("enum")
enum_path = json_pointer(path, "enum")
if not isinstance(enum_value, list) or not enum_value:
result.add_issue("SCH007", enum_path, "enum must be a non-empty array")
return
for index, item in enumerate(enum_value):
if value_matches_schema_type(item, schema_type):
continue
item_path = json_pointer(enum_path, str(index))
result.add_issue("SCH007", item_path, "enum item type does not match schema type")
def _validate_type_specific_rules(
schema: dict[str, Any],
schema_type: str,
path: str,
result: ValidationResult,
skip_paths: set[str],
restrict_array_items_simple: bool,
) -> None:
validators = {
"string": _validate_string_schema,
"number": _validate_number_schema,
"integer": _validate_integer_schema,
"boolean": _validate_boolean_schema,
"array": _validate_array_schema,
"object": _validate_object_schema,
}
validators[schema_type](schema, path, result, skip_paths, restrict_array_items_simple)
def _validate_string_schema(
schema: dict[str, Any],
path: str,
result: ValidationResult,
_: set[str],
__: bool,
) -> None:
min_length = schema.get("minLength")
max_length = schema.get("maxLength")
if min_length is not None and not (is_integer_value(min_length) and min_length >= 0):
result.add_issue("SCH101", json_pointer(path, "minLength"), "minLength must be integer >= 0")
if max_length is not None and not (is_integer_value(max_length) and max_length >= 0):
result.add_issue("SCH102", json_pointer(path, "maxLength"), "maxLength must be integer >= 0")
if is_integer_value(min_length) and is_integer_value(max_length) and min_length > max_length:
result.add_issue("SCH103", path, "minLength must not be greater than maxLength")
_validate_pattern(schema.get("pattern"), path, result)
_validate_format(schema.get("format"), path, result)
def _validate_pattern(pattern: Any, path: str, result: ValidationResult) -> None:
if pattern is None:
return
if not isinstance(pattern, str):
result.add_issue("SCH104", json_pointer(path, "pattern"), "pattern must be string")
return
try:
re.compile(pattern)
except re.error:
result.add_issue("SCH104", json_pointer(path, "pattern"), "pattern must be a valid regex")
def _validate_format(format_value: Any, path: str, result: ValidationResult) -> None:
if format_value is None:
return
if format_value not in SUPPORTED_STRING_FORMATS:
result.add_issue("SCH105", json_pointer(path, "format"), "unsupported string format")
def _validate_number_schema(
schema: dict[str, Any],
path: str,
result: ValidationResult,
_: set[str],
__: bool,
) -> None:
_validate_numeric_keyword(schema, path, result, ("minimum", "maximum"), "SCH201", is_number_value)
_validate_numeric_keyword(
schema,
path,
result,
("exclusiveMinimum", "exclusiveMaximum"),
"SCH202",
is_number_value,
)
_validate_range_pair(schema, path, result, "minimum", "maximum", "SCH203", lambda left, right: left > right)
_validate_range_pair(
schema,
path,
result,
"exclusiveMinimum",
"exclusiveMaximum",
"SCH204",
lambda left, right: left >= right,
)
multiple_of = schema.get("multipleOf")
if multiple_of is not None and not (is_number_value(multiple_of) and multiple_of > 0):
result.add_issue("SCH205", json_pointer(path, "multipleOf"), "multipleOf must be a positive number")
def _validate_integer_schema(
schema: dict[str, Any],
path: str,
result: ValidationResult,
_: set[str],
__: bool,
) -> None:
keys = ("minimum", "maximum", "exclusiveMinimum", "exclusiveMaximum", "multipleOf")
_validate_numeric_keyword(schema, path, result, keys, "SCH301", is_integer_value)
multiple_of = schema.get("multipleOf")
if multiple_of is not None and not (is_integer_value(multiple_of) and multiple_of > 0):
result.add_issue("SCH302", json_pointer(path, "multipleOf"), "multipleOf must be integer > 0")
_validate_range_pair(schema, path, result, "minimum", "maximum", "SCH303", lambda left, right: left > right)
_validate_range_pair(
schema,
path,
result,
"exclusiveMinimum",
"exclusiveMaximum",
"SCH304",
lambda left, right: left >= right,
)
def _validate_boolean_schema(
_: dict[str, Any],
__: str,
___: ValidationResult,
____: set[str],
_____: bool,
) -> None:
return
def _validate_array_schema(
schema: dict[str, Any],
path: str,
result: ValidationResult,
_: set[str],
restrict_array_items_simple: bool,
) -> None:
items = schema.get("items")
items_path = json_pointer(path, "items")
if "items" not in schema:
result.add_issue("SCH501", items_path, "array schema must define items")
elif not isinstance(items, dict):
result.add_issue("SCH502", items_path, "items must be a schema object")
else:
validate_schema_node(items, items_path, result, restrict_array_items_simple=restrict_array_items_simple)
if restrict_array_items_simple and items.get("type") not in {"boolean", "integer", "number", "string"}:
result.add_issue("SCH507", json_pointer(items_path, "type"), "array items.type is not supported")
_validate_array_size(schema, path, result)
if "uniqueItems" in schema and not isinstance(schema.get("uniqueItems"), bool):
result.add_issue("SCH505", json_pointer(path, "uniqueItems"), "uniqueItems must be boolean")
_validate_array_default(schema.get("default"), items, schema.get("uniqueItems"), path, result)
def _validate_array_size(schema: dict[str, Any], path: str, result: ValidationResult) -> None:
min_items = schema.get("minItems")
max_items = schema.get("maxItems")
if min_items is not None and not (is_integer_value(min_items) and min_items >= 0):
result.add_issue("SCH503", json_pointer(path, "minItems"), "minItems must be integer >= 0")
if max_items is not None and not (is_integer_value(max_items) and max_items >= 0):
result.add_issue("SCH503", json_pointer(path, "maxItems"), "maxItems must be integer >= 0")
if is_integer_value(min_items) and is_integer_value(max_items) and min_items > max_items:
result.add_issue("SCH504", path, "minItems must not be greater than maxItems")
def _validate_array_default(
default_value: Any,
items: Any,
unique_items: Any,
path: str,
result: ValidationResult,
) -> None:
if default_value is None or not isinstance(default_value, list) or not isinstance(items, dict):
return
item_type = items.get("type")
if item_type in SUPPORTED_SCHEMA_TYPES:
_validate_array_default_items(default_value, item_type, path, result)
if unique_items is True:
_validate_array_default_uniqueness(default_value, path, result)
def _validate_array_default_items(
default_value: list[Any],
item_type: str,
path: str,
result: ValidationResult,
) -> None:
default_path = json_pointer(path, "default")
for index, item in enumerate(default_value):
if value_matches_schema_type(item, item_type):
continue
item_path = json_pointer(default_path, str(index))
result.add_issue("SCH508", item_path, "default array item type does not match items.type")
def _validate_array_default_uniqueness(
default_value: list[Any],
path: str,
result: ValidationResult,
) -> None:
seen: set[Any] = set()
default_path = json_pointer(path, "default")
for index, item in enumerate(default_value):
marker = _freeze_value(item)
if marker in seen:
item_path = json_pointer(default_path, str(index))
result.add_issue("SCH509", item_path, "default array must not contain duplicates")
seen.add(marker)
def _validate_object_schema(
schema: dict[str, Any],
path: str,
result: ValidationResult,
skip_paths: set[str],
restrict_array_items_simple: bool,
) -> None:
properties_path = json_pointer(path, "properties")
properties = schema.get("properties")
if "properties" not in schema:
result.add_issue("SCH606", properties_path, "object schema must define properties")
properties = None
elif not isinstance(properties, dict):
result.add_issue("SCH601", properties_path, "properties must be object")
properties = None
_validate_object_properties(properties, properties_path, result, skip_paths, restrict_array_items_simple)
_validate_required(schema.get("required"), properties, path, result)
def _validate_object_properties(
properties: Any,
properties_path: str,
result: ValidationResult,
skip_paths: set[str],
restrict_array_items_simple: bool,
) -> None:
if not isinstance(properties, dict):
return
for key, subschema in properties.items():
prop_path = json_pointer(properties_path, key)
if prop_path in skip_paths:
continue
if not isinstance(subschema, dict):
result.add_issue("SCH602", prop_path, "property schema must be an object")
continue
validate_schema_node(
subschema,
prop_path,
result,
skip_paths=skip_paths,
restrict_array_items_simple=restrict_array_items_simple,
)
def _validate_required(required: Any, properties: Any, path: str, result: ValidationResult) -> None:
if required is None:
return
required_path = json_pointer(path, "required")
if not isinstance(required, list) or not all(isinstance(item, str) for item in required):
result.add_issue("SCH603", required_path, "required must be a string array")
return
seen: set[str] = set()
for item in required:
if item in seen:
result.add_issue("SCH605", required_path, "required must not contain duplicates")
seen.add(item)
if isinstance(properties, dict) and item in properties:
continue
result.add_issue("SCH604", required_path, f"required field '{item}' is not defined in properties")
def _validate_numeric_keyword(
schema: dict[str, Any],
path: str,
result: ValidationResult,
keys: tuple[str, ...],
issue_code: str,
matcher,
) -> None:
for key in keys:
value = schema.get(key)
if key in schema and not matcher(value):
result.add_issue(issue_code, json_pointer(path, key), f"{key} has invalid type")
def _validate_range_pair(
schema: dict[str, Any],
path: str,
result: ValidationResult,
left_key: str,
right_key: str,
issue_code: str,
comparator,
) -> None:
left_value = schema.get(left_key)
right_value = schema.get(right_key)
if not (is_number_value(left_value) and is_number_value(right_value)):
return
if comparator(left_value, right_value):
result.add_issue(issue_code, path, f"{left_key} and {right_key} relationship is invalid")
def _freeze_value(value: Any) -> Any:
if isinstance(value, list):
return tuple(_freeze_value(item) for item in value)
if isinstance(value, dict):
return tuple(sorted((key, _freeze_value(item)) for key, item in value.items()))
return value