OrmSerializer(table_data, table_map, result_set, is_array, depth)
Bases: Generic[SerializedType]
Generate Python models from a table map and result set.
Generate Python models from a table map and result set.
:param table_data: Table data for the returned model type.
:param table_map: Map of tablenames and models.
:param result_set: SQL Alchemy cursor result.
:param is_array: Deserialize as a model or a list of models?
:param depth: Model tree depth.
PARAMETER |
DESCRIPTION |
table_data |
TYPE:
OrmTable
|
table_map |
TYPE:
Map
|
result_set |
TYPE:
CursorResult[Any]
|
is_array |
TYPE:
bool
|
depth |
TYPE:
int
|
Source code in ormdantic/generator/_serializer.py
| def __init__(
self,
table_data: OrmTable, # type: ignore
table_map: Map,
# TODO: Missing type parameters for generic type "CursorResult".
result_set: CursorResult[Any],
is_array: bool,
depth: int,
) -> None:
"""Generate Python models from a table map and result set.
:param table_data: Table data for the returned model type.
:param table_map: Map of tablenames and models.
:param result_set: SQL Alchemy cursor result.
:param is_array: Deserialize as a model or a list of models?
:param depth: Model tree depth.
"""
self._table_data = table_data
self._table_map = table_map
self._result_set = result_set
self._is_array = is_array
self._depth = depth
self._result_schema = ResultSchema(
is_array=is_array,
references={
table_data.tablename: self._get_result_schema(
table_data, depth, is_array
)
},
)
self._columns = [it[0] for it in self._result_set.cursor.description]
self._return_dict: dict[str, Any] = {}
|
deserialize
Deserialize the result set into Python models.
Source code in ormdantic/generator/_serializer.py
| def deserialize(self) -> SerializedType:
"""Deserialize the result set into Python models."""
for row in self._result_set:
row_schema = {}
for column_idx, column_tree in enumerate(self._columns):
# `node` is the currently acted on level of depth in return.
node = self._return_dict
# `schema` describes acted on level of depth.
schema = self._result_schema
column_tree, column = column_tree.split("\\")
current_tree = ""
for branch in column_tree.split("/"):
current_tree += f"/{branch}"
# Update schema position.
schema = schema.references[branch]
# Update last pk if this column is a pk.
if (
column == schema.table_data.pk # type: ignore
and current_tree == f"/{column_tree}"
):
row_schema[current_tree] = row[column_idx]
# If this branch in schema is absent from result set.
if row_schema[current_tree] is None:
break
# Initialize this object if it is None.
if node.get(branch) is None:
node[branch] = {}
if (
schema.is_array
and node[branch].get(row_schema[current_tree]) is None
):
node[branch][row_schema[current_tree]] = {}
# Set node to this level.
if schema.is_array:
node = node[branch][row_schema[current_tree]]
else:
node = node[branch]
# If we did not break.
else:
# Set value.
if column:
node[column] = row[column_idx]
if not self._return_dict:
return None # type: ignore
if self._result_schema.is_array:
return [
self._table_data.model(**record)
for record in self._prep_result(self._return_dict, self._result_schema)[
self._table_data.tablename
] # type: ignore
]
return self._table_data.model(
**self._prep_result(self._return_dict, self._result_schema)[
self._table_data.tablename
]
)
|