Skip to content

API

physiomodeler

Model

Model(
    dynamics: Callable | "Model" | list[Callable | "Model"],
    *,
    state_components: list[str] | None = None,
    initial_state: dict[str, float] | None = None,
    state_derivative_label_map: (
        dict[str, str] | None
    ) = None,
    inputs: dict[str, float | int | Callable] | None = None,
    parameters: (
        dict[str, int | float | str | bool | list | tuple]
        | None
    ) = None,
    events: list[Callable] | None = None,
    post_analysis: list[Callable] | None = None,
    time_units: TimeUnitsType | None = None,
    name: str | None = None
)

A state-space model described by system dynamics and parameters.

This class can be used to describe and numerically analyze a state space model. A state-space model has at least one state variable that evolves over time based on the state, as well as the inputs to and parameters of the model.

The model is described by one or multiple dynamics (functions, submodels, or combinations thereof) that define how the state changes. A dynamics function can have time, state, inputs and parameters as arguments. Each dynamics function should return a dictionary containing state derivatives and other value outputs. Dynamics are called in the order they are provided. The output of each dynamics is added to the input of later dynamics. State derivatives can be returned by different dynamics, but a derivative should be returned for all state variables.

By default, the key for a state derivative should be "d" + the state label, e.g. "dvelocity" for the state "velocity". Alternatively, you can supply a mapping dictionary state_derivative_label_map where the key is the state variable label and the value is the state derivative label.

Inputs and parameters both contain values that can be used by the update function. There is, however, a distinction between inputs and parameters. Parameters are fixed values that are independent of time, e.g. the mass of an object at the end of a pendulum. Inputs are disturbances added to the system, and can be either fixed or variable with time, e.g. the force applied to a moving mass. inputs and parameters are both dictionaries. inputs values can be functions with the signature fun(time, inputs, parameters), which are converted to the appropriate before being passed to the update function.

Events are functions with a signature fun(time, state, inputs, parameters). An event occurs when a function returns 0. The solver will find an accurate value of time where fun(time, ...) == 0. This ensures proper simulation around that time point. Each event function can be marked 'terminal' by adding the terminal attribute with value True to the event function (fun.terminal = True). This results in the simulation being terminated as soon as the event return value crosses 0. See scipy.integrate.solve_ivp for more details.

Post analysis functions can be supplied, which receive the output of the system as a pandas DataFrame (see run_simulation) and either update the dataframe or return an array, list, Series or DataFrame object with the same length that is added to the output of the model.

Example

The example below is the classic undampended pendulum. The system is governed by the second derivative of the angle theta. The system state has two components: the current angle "theta" and the angular velocity "dtheta". The function pendulum() calculates the second derivative and returns both the first and second derivative. When this system is solved, the new state is calculated based on the derivatives of the previous step.

>>> def pendulum(time, state, inputs, parameters):
...     g = parameters["g"]
...     l = parameters["l"]
...     theta = state["theta"]
...
...     first_derivative_theta = state["dtheta"]
...     second_derivative_theta = -(g / l) * np.sin(theta)
...
...     return {
...         "dtheta": first_derivative_theta,
...         "ddtheta": second_derivative_theta
...     }
>>> model = Model(
...     function=pendulum,
...     state=("theta", "dtheta"),
...     parameters={"g": 9.81, "l": 1},
... )
>>> df = model.run_simulation(time=10, initial_state=(0.1, 0))

Parameters:

Name Type Description Default
function Callable | list[Callable]

function(s) with accepting time, state, inputs and parameters as arguments and returning a dictionary containing (at least) the state derivatives

required
state list | dict

list of state labels, or a dictionary with the intial values of the state

required
inputs dict

dictionary containing the inputs as name/value pairs

None
parameters dict

a dictionary containing the parameters of the model

None
post_analysis dict

optional, a dictionary containing functions to calculate values based on the output of the model

None
state_derivative_label_map dict

optional, a dictionary mapping the labels of the state derivatives to the labels of the state variables.

None

Initialize a Model instance.

Parameters:

Name Type Description Default
dynamics Callable | 'Model' | list[Callable | 'Model']

System dynamics defining state evolution. Can be a single function, Model instance, or list of callables/Models.

required
state_components list[str] | None

Optional list of state variable labels.

None
initial_state dict[str, float] | None

Optional initial values for state variables as dict.

None
state_derivative_label_map dict[str, str] | None

Maps state labels to their derivative labels. If not provided, defaults are generated from settings.

None
inputs dict[str, float | int | Callable] | None

Model inputs as dict. Values can be constants or time-dependent functions.

None
parameters dict[str, int | float | str | bool | list | tuple] | None

Model parameters as dict. Fixed values independent of time.

None
events list[Callable] | None

List of event detection functions.

None
post_analysis list[Callable] | None

List of post-simulation analysis functions.

None
time_units TimeUnitsType | None

Units for time axis in output (e.g., "s", "ms").

None
name str | None

Optional model name. Auto-generated if not provided.

None
Source code in physiomodeler/model.py
def __init__(
    self,
    dynamics: Callable | "Model" | list[Callable | "Model"],
    *,
    state_components: list[str] | None = None,
    initial_state: dict[str, float] | None = None,
    state_derivative_label_map: dict[str, str] | None = None,
    inputs: dict[str, float | int | Callable] | None = None,
    parameters: dict[str, int | float | str | bool | list | tuple] | None = None,
    events: list[Callable] | None = None,
    post_analysis: list[Callable] | None = None,
    time_units: TimeUnitsType | None = None,
    name: str | None = None,
):
    """Initialize a Model instance.

    Args:
        dynamics: System dynamics defining state evolution. Can be a single function,
            Model instance, or list of callables/Models.
        state_components: Optional list of state variable labels.
        initial_state: Optional initial values for state variables as dict.
        state_derivative_label_map: Maps state labels to their derivative labels.
            If not provided, defaults are generated from settings.
        inputs: Model inputs as dict. Values can be constants or time-dependent functions.
        parameters: Model parameters as dict. Fixed values independent of time.
        events: List of event detection functions.
        post_analysis: List of post-simulation analysis functions.
        time_units: Units for time axis in output (e.g., "s", "ms").
        name: Optional model name. Auto-generated if not provided.
    """
    dynamics = c.convert_single_function_to_list(dynamics)
    dynamics = c.convert_nonstr_sequence_to_list(dynamics)

    state_components = state_components or []
    initial_state = initial_state or {}
    parameters = parameters or {}
    state_derivative_label_map = state_derivative_label_map or {}

    # gather state labels and initial state from submodels
    # this needs to be done before __attrs_init__ is called, because
    # __attrs_init__ requires the correct state labels to be set
    for function in dynamics:
        if not isinstance((model := function), Model):
            continue

        validate(model)

        for state_label in model.state_components:
            if state_label not in state_components:
                state_components.append(state_label)
                # Update derivative label mapping if available
                if (
                    state_label in model.state_derivative_label_map
                    and state_label not in state_derivative_label_map
                ):
                    state_derivative_label_map[state_label] = (
                        model.state_derivative_label_map[state_label]
                    )

            # Update initial state if not already set
            if (
                state_label in model.initial_state
                and state_label not in initial_state
            ):
                initial_state[state_label] = model.initial_state[state_label]

    getattr(self, "__attrs_init__")(
        dynamics=dynamics,
        state_components=state_components,
        initial_state=initial_state,
        state_derivative_label_map=state_derivative_label_map or {},
        inputs=inputs or {},
        parameters=parameters or {},
        events=events or [],
        post_analysis=post_analysis or [],
        time_units=time_units,
        name=name,
    )

run_simulation

run_simulation(
    *,
    time: (
        float
        | tuple[float, float]
        | tuple[float, float, float]
        | TimeSpec
        | list
        | ndarray
    ),
    initial_state: dict | None = None,
    inputs: dict | None = None,
    parameters: dict | None = None,
    solve_ivp_method: Literal["Radau"] = "Radau",
    relative_tolerance: float | None = None,
    absolute_torelance: float | None = None,
    result_index_as_time: bool = False,
    solver_output: dict | None = None
)

Generate an output based on the given initial state, inputs and parameters.

This function generates the output of the model over a given time axis based on an initial state, the inputs and parameters. If no or not all the initial_state, inputs or parameters values are given, the values provided to the system at initialisation are used instead.

The time can be passed in several ways. The simplest is passing a duration. The model will be solved up to that duration. When time is a tuple of length 2, it indicates the start and end time. (Note that the initial state is the state at t=start, not at t=0.) The time step will be time_step if supplied, or determined by the solver otherwise. If time is a tuple of length 3, the last value is used as the time step. If a list or array is supplied, the output will contain these time points.

Relative and absolute tolerance for the solver. The solver estimates a local error as absolute_torelance + relative_tolerance * abs(state). relative_tolerance controls a relative accuracy ('number of correct digits') while absolute_torelance controls absolute accuracy ('number of correct decimal places'). See scipy.integrate.solve_ivp for more details.

Parameters:

Name Type Description Default
initial_state dict

optional, initial values for the state as a dictionary, or list/tuple of values

None
inputs dict

optional, inputs of the system as a dictionary or list/tuple of values

None
parameters dict

optional, dictionary with the parameters of the system

None
time float | TimeSpec | tuple | ndarray

Time specification. Can be: - float: total duration from 0 to time - TimeSpec: start, end, and optional step (use keyword args for clarity) - tuple[start, end]: time range - tuple[start, end, step]: time range with step - list/ndarray: specific time points

Examples: time=10 # Simulate 0 to 10 time=TimeSpec(start=0, end=10) # Start to end time=TimeSpec(start=0, end=10, step=0.1) # With step

required
solve_ivp_method str

solver used; currently only "Radau" is supported

'Radau'
relative_tolerance float | None

relative tolerance

None
absolute_torelance float | None

absolute tolerance

None
Source code in physiomodeler/model.py
def run_simulation(  # noqa: F811
    self,
    *,
    time: float
    | tuple[float, float]
    | tuple[float, float, float]
    | TimeSpec
    | list
    | np.ndarray,
    initial_state: dict | None = None,
    inputs: dict | None = None,
    parameters: dict | None = None,
    solve_ivp_method: Literal["Radau"] = "Radau",
    relative_tolerance: float | None = None,
    absolute_torelance: float | None = None,
    result_index_as_time: bool = False,
    solver_output: dict | None = None,
):
    """Generate an output based on the given initial state, inputs and
    parameters.

    This function generates the output of the model over a given time axis
    based on an initial state, the inputs and parameters. If no or not all
    the `initial_state`, `inputs` or `parameters` values are given, the
    values provided to the system at initialisation are used instead.

    The time can be passed in several ways. The simplest is passing a
    duration. The model will be solved up to that duration. When `time` is
    a tuple of length 2, it indicates the start and end time. (Note that
    the initial state is the state at `t=start`, not at `t=0`.) The time
    step will be `time_step` if supplied, or determined by the solver otherwise.
    If `time` is a tuple of length 3, the last value is used as the time
    step. If a list or array is supplied, the output will contain these
    time points.

    Relative and absolute tolerance for the solver. The solver estimates a
    local error as `absolute_torelance + relative_tolerance * abs(state)`. `relative_tolerance` controls a relative
    accuracy ('number of correct digits') while `absolute_torelance` controls absolute
    accuracy ('number of correct decimal places'). See
    [`scipy.integrate.solve_ivp`](https://docs.scipy.org/doc/scipy-1.14.1/reference/reference/generated/scipy.integrate.solve_ivp.html#scipy.integrate.solve_ivp)
    for more details.

    Args:
        initial_state (dict): optional, initial values for the state as a
            dictionary, or list/tuple of values
        inputs (dict): optional, inputs of the system as a dictionary or
            list/tuple of values
        parameters (dict): optional, dictionary with the parameters of the system
        time (float | TimeSpec | tuple | np.ndarray): Time specification. Can be:
            - float: total duration from 0 to time
            - TimeSpec: start, end, and optional step (use keyword args for clarity)
            - tuple[start, end]: time range
            - tuple[start, end, step]: time range with step
            - list/ndarray: specific time points

            Examples:
              time=10  # Simulate 0 to 10
              time=TimeSpec(start=0, end=10)  # Start to end
              time=TimeSpec(start=0, end=10, step=0.1)  # With step
        solve_ivp_method (str): solver used; currently only "Radau" is supported
        relative_tolerance (float | None): relative tolerance
        absolute_torelance (float | None): absolute tolerance
    """

    validate(self)

    inputs = v.validate_inputs(inputs)
    parameters = v.validate_parameters(parameters)
    initial_state = v.validate_initial_state(
        initial_state, self.initial_state, self.state_components
    )
    (t_start, t_end, time_) = c.convert_time(time)

    solve_ivp_func, full_output_func, running_output = self.get_functions(
        inputs, parameters
    )
    # Only pass runtime parameters (not model's own parameters)
    runtime_params = parameters if parameters else {}
    events = h.wrap_nested_events(
        model=self,
        state_components=self.state_components,
        inputs=running_output,
        runtime_parameters=runtime_params,
    )

    rtol = relative_tolerance or settings.simulation.relative_tolerance
    atol = absolute_torelance or settings.simulation.absolute_tolerance

    solution = integrate.solve_ivp(
        solve_ivp_func,
        (t_start, t_end),
        [initial_state[k] for k in self.state_components],
        method=solve_ivp_method,
        t_eval=time_,
        events=events,
        rtol=rtol,
        atol=atol,
        vectorized=False,
    )
    if solver_output is not None:
        solver_output.update(**solution)

    # generate an output for each timepoint/state combination
    all_outputs = list(
        full_output_func(time, state)
        for time, state in zip(solution.t, solution.y.T)
    )
    output_keys = all_outputs[0].keys()

    # zip all outputs
    zipped_outputs = {k: [o[k] for o in all_outputs] for k in output_keys}
    if result_index_as_time:
        if not self.time_units:
            raise ValueError(
                "Time units set to None, cannot convert index to timedelta."
            )
        index_name = "time"
        time = pd.to_timedelta(solution.t, unit=self.time_units)
    else:
        index_name = f"time ({self.time_units})" if self.time_units else "time"
        time = solution.t

    data_frame = pd.DataFrame(
        index=pd.Index(data=time, name=index_name),
        data=zipped_outputs,
    )

    for func in self.post_analysis:
        result = func(data_frame.copy())
        if result is None:
            raise TypeError(
                f"Post-analysis function '{func.__name__}' does not return any values."
            )
        if isinstance(result, pd.Series):
            result = result.to_frame()

        if isinstance(result, dict):
            if not len(result):
                raise RuntimeError(
                    f"Post-analysis function '{func.__name__}' returned an empty dictionary."
                )
            # Add dict items as columns
            for key, value in result.items():
                if key in data_frame.columns:
                    msg = f"Post-analysis function '{func.__name__}' returned key '{key}' which already exists in dataframe"
                    raise ValueError(msg)
                data_frame[key] = value
        elif isinstance(result, pd.DataFrame):
            # Check overlapping columns have the same values
            overlap_cols = set(data_frame.columns) & set(result.columns)
            new_cols = set(result.columns) - set(data_frame.columns)

            for col in overlap_cols:
                if not data_frame[col].equals(result[col]):
                    msg = f"Post-analysis function '{func.__name__}' returned dataframe with column '{col}' that has different values than the original"
                    raise ValueError(msg)

            if not len(new_cols):
                raise RuntimeError(
                    f"No new columns to add from post-analysis function '{func.__name__}'."
                )

            # Add only new columns
            data_frame = pd.concat([data_frame, result[list(new_cols)]], axis=1)
        else:
            msg = f"Post-analysis function '{func.__name__}' must return a dict, dataframe or series, not {type(result)}"
            raise TypeError(msg)

    # data_frame.attrs["solve_result"] = solution

    return data_frame

find_equilibrium_state

find_equilibrium_state(
    *,
    estimated_equilibrium_state: dict | None = None,
    **kwargs
) -> dict

Find the equilibrium state of the model.

The equilibrium state is either a state of absolute equilibrium if the inputs are constant, the state at the start of a period resulting in a periodic output if the inputs are periodic.

This function runs the model from 0 to time a maximum of max_n_runs times. The initial state is equal to the estimated equilibrium state during the first run, and the last state of the last iteration each subsequent run. Assuming the model converges to a stable state, each subsequent run should start and end closer to the equilibrium state than the previous run. When the end state does not change by more than rtol, it is assumed this is the equilibrium state.

Parameters:

Name Type Description Default
time float

the time at which the equilibrium is evaluated.

required
time_step float | None

time step used in running the simulation. Defaults to None.

required
estimated_equilibrium_state dict | None

starting point for finding the equilibrium state. Defaults to None.

None
inputs dict | None

inputs of the model. Defaults to None.

required
parameters dict | None

parameters of the model. Defaults to None.

required
max_n_runs int

maximum number of iterations before the function fails. Defaults to 100.

required
rtol float

relative tolerance of model run. Defaults to 1e-4.

required
atol float

absolute tolerance of the model run. Defaults to 1e-6.

required
rtol_eq float

relative tolerance comparing the current to the previous iteration. Defaults to 1e-3.

required

Raises:

Type Description
RuntimeError

is raised then no equilibrium can be found after max_n_runs iterations.

Returns:

Name Type Description
dict dict

the resulting equilibrium state.

Source code in physiomodeler/model.py
def find_equilibrium_state(
    self, *, estimated_equilibrium_state: dict | None = None, **kwargs
) -> dict:
    """
    Find the equilibrium state of the model.

    The equilibrium state is either a state of absolute equilibrium if the
    inputs are constant, the state at the start of a period resulting in a
    periodic output if the inputs are periodic.

    This function runs the model from 0 to `time` a maximum of `max_n_runs`
    times. The initial state is equal to the estimated equilibrium state
    during the first run, and the last state of the last iteration each
    subsequent run. Assuming the model converges to a stable state, each
    subsequent run should start and end closer to the equilibrium state
    than the previous run.
    When the end state does not change by more than `rtol`, it is assumed
    this is the equilibrium state.

    Args:
        time (float): the time at which the equilibrium is evaluated.
        time_step (float | None, optional): time step used in running the simulation. Defaults to None.
        estimated_equilibrium_state (dict | None, optional): starting point
            for finding the equilibrium state. Defaults to None.
        inputs (dict | None, optional): inputs of the model. Defaults to None.
        parameters (dict | None, optional): parameters of the model. Defaults to None.
        max_n_runs (int, optional): maximum number of iterations before the
            function fails. Defaults to 100.
        rtol (float, optional): relative tolerance of model run. Defaults to 1e-4.
        atol (float, optional): absolute tolerance of the model run. Defaults to 1e-6.
        rtol_eq (float, optional): relative tolerance comparing the current
            to the previous iteration. Defaults to 1e-3.

    Raises:
        RuntimeError: is raised then no equilibrium can be found after
            max_n_runs iterations.

    Returns:
        dict: the resulting equilibrium state.
    """
    result = self.run_to_equilibrium(
        initial_state=estimated_equilibrium_state, **kwargs
    )
    return {
        k: float(v)
        for k, v in result.loc[:, self.state_components].iloc[-1].items()
    }

replace

replace(**changes) -> Self

Create a copy of the model with the given changes.

This function creates a copy of the model with the given changes. Similar to attrs.evolve, but with proper typing.

Parameters:

Name Type Description Default
**changes

changes to the model

{}

Returns:

Name Type Description
Self Self

a copy of the model with the given changes

Source code in physiomodeler/model.py
def replace(self, **changes) -> Self:
    """Create a copy of the model with the given changes.

    This function creates a copy of the model with the given changes.
    Similar to `attrs.evolve`, but with proper typing.

    Args:
        **changes: changes to the model

    Returns:
        Self: a copy of the model with the given changes
    """
    if "state_components" in changes:
        new_state_components = changes["state_components"]
        kept_state_components = set(self.state_components) & set(
            new_state_components
        )

        changes["state_derivative_label_map"] = {
            k: v
            for k, v in self.state_derivative_label_map.items()
            if k in kept_state_components
        } | changes.get("state_derivative_label_map", {})
        changes["initial_state"] = {
            k: v
            for k, v in self.initial_state.items()
            if k in kept_state_components
        } | changes.get("initial_state", {})

        changes["parameters"] = self.parameters | changes.get("parameters", {})

    return attrs.evolve(self, **changes)

events

state_crosses_value

state_crosses_value(
    label: str,
    value: float,
    *,
    terminal: bool = False,
    direction: Literal[-1, 0, 1] = 0
)

Create an event that detects when a state crossing the given value.

Example

This example will have an event and terminate when the volume (state) crosses the value 0.

model = PhysioModeler(
    ...,
    events=[
        state_crosses_value("volume", 0, terminal=True),
    ]
)

Parameters:

Name Type Description Default
label str

label of the state to track

required
value float

the event will trigger when the state crosses this value

required
terminal bool

whether to terminate the simulation if the event occurs

False
direction int

if 0, any crossing will be detected; if 1, only positive crossings (value goes from negative to positive) will be detected; if -1, only negative crossings will be detected.

0
Source code in physiomodeler/events.py
def state_crosses_value(
    label: str,
    value: float,
    *,
    terminal: bool = False,
    direction: Literal[-1, 0, 1] = 0,
):
    """
    Create an event that detects when a state crossing the given value.

    Example:
        This example will have an event and terminate when the volume (state)
        crosses the value 0.

        ```
        model = PhysioModeler(
            ...,
            events=[
                state_crosses_value("volume", 0, terminal=True),
            ]
        )
        ```

    Args:
        label (str): label of the state to track
        value (float): the event will trigger when the state crosses this value
        terminal (bool): whether to terminate the simulation if the event occurs
        direction (int): if 0, any crossing will be detected; if 1, only positive
            crossings (value goes from negative to positive) will be detected; if
            -1, only negative crossings will be detected.
    """

    def event(time, state, inputs, parameters):
        return state[label] - value

    setattr(event, "terminal", terminal)
    setattr(event, "direction", direction)
    return event