Skip to content

Estimators & Observers

Estimators and observers provide estimates for states or external disturbances when they are not directly measurable. OpenSMC includes 7 powerful estimators, including ESO, HGO, and Levant's differentiator.

Usage Example

from opensmc.estimators import ExtendedStateObserver

# Create an ESO for simultaneous state and disturbance estimation
eso = ExtendedStateObserver(order=2, gains=[10, 20])

# Update observer based on measurement and control input
x_hat, disturbance_hat = eso.update(y=1.0, u=0.5, dt=0.01)

Available Estimators

estimators

OpenSMC Estimators — 7 disturbance/state estimator implementations.

Classes

DisturbanceObserver

Bases: Estimator

Disturbance observer for 2nd-order mechanical systems.

Parameters

J : float Inertia (default 2.0). b : float Damping coefficient (default 0.15). k1 : float Disturbance estimation gain (default 5000.0). k2 : float Velocity estimation gain (default 200.0). dt : float Integration time step for Euler update (default 1e-4).

Source code in opensmc/estimators/disturbance_observer.py
class DisturbanceObserver(Estimator):
    """Disturbance observer for 2nd-order mechanical systems.

    Parameters
    ----------
    J : float
        Inertia (default 2.0).
    b : float
        Damping coefficient (default 0.15).
    k1 : float
        Disturbance estimation gain (default 5000.0).
    k2 : float
        Velocity estimation gain (default 200.0).
    dt : float
        Integration time step for Euler update (default 1e-4).
    """

    def __init__(self, J=2.0, b=0.15, k1=5000.0, k2=200.0, dt=1e-4):
        self.J = J
        self.b = b
        self.k1 = k1
        self.k2 = k2
        self.dt = dt

        # Derived parameters
        self.a = 1.0 / J       # input gain
        self.b_obs = b / J     # damping ratio

        # Internal state: [d_hat, omega_hat]
        self._z = np.zeros(2)

    def _dynamics(self, z, u, x2):
        """Compute dz/dt for the observer.

        Parameters
        ----------
        z : ndarray (2,)
            Observer state [d_hat, omega_hat].
        u : float
            Control input.
        x2 : float
            Measured velocity (second state of the plant).

        Returns
        -------
        zdot : ndarray (2,)
        """
        d_hat, omega_hat = z
        err = omega_hat - x2
        d_hat_dot = -self.k1 * err
        omega_hat_dot = (self.a * d_hat + self.a * u
                         - self.k2 * err - self.b_obs * x2)
        return np.array([d_hat_dot, omega_hat_dot])

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute disturbance estimate and advance one Euler step.

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray
            Plant state; x[1] is used as the velocity measurement.
        u : float or ndarray
            Control input (scalar).
        y : ignored
        **kwargs :
            dt : float, optional — override the default time step.

        Returns
        -------
        dhat : float
            Estimated disturbance.
        """
        dt = kwargs.get('dt', self.dt)
        x2 = float(x[1]) if hasattr(x, '__len__') else float(x)
        u_val = float(u) if hasattr(u, '__len__') else float(u)

        # Current estimate (before update)
        dhat = float(self._z[0])

        # Euler integration
        zdot = self._dynamics(self._z, u_val, x2)
        self._z = self._z + dt * zdot

        return dhat

    def reset(self):
        """Reset observer state to zero."""
        self._z = np.zeros(2)
Functions
estimate(t, x, u, y=None, **kwargs)

Compute disturbance estimate and advance one Euler step.

Parameters

t : float Current time. x : ndarray Plant state; x[1] is used as the velocity measurement. u : float or ndarray Control input (scalar). y : ignored **kwargs : dt : float, optional — override the default time step.

Returns

dhat : float Estimated disturbance.

Source code in opensmc/estimators/disturbance_observer.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute disturbance estimate and advance one Euler step.

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray
        Plant state; x[1] is used as the velocity measurement.
    u : float or ndarray
        Control input (scalar).
    y : ignored
    **kwargs :
        dt : float, optional — override the default time step.

    Returns
    -------
    dhat : float
        Estimated disturbance.
    """
    dt = kwargs.get('dt', self.dt)
    x2 = float(x[1]) if hasattr(x, '__len__') else float(x)
    u_val = float(u) if hasattr(u, '__len__') else float(u)

    # Current estimate (before update)
    dhat = float(self._z[0])

    # Euler integration
    zdot = self._dynamics(self._z, u_val, x2)
    self._z = self._z + dt * zdot

    return dhat
reset()

Reset observer state to zero.

Source code in opensmc/estimators/disturbance_observer.py
def reset(self):
    """Reset observer state to zero."""
    self._z = np.zeros(2)

ExtendedStateObserver

Bases: Estimator

Extended State Observer for 2nd-order systems.

Parameters

alpha1 : float Observer gain 1 (default 6.0). alpha2 : float Observer gain 2 (default 11.0). alpha3 : float Observer gain 3 (default 6.0). epsilon : float Bandwidth parameter; smaller => faster but more peaking (default 0.01). b_input : float Input gain of the plant (default 1.0). dt : float Integration time step for Euler update (default 1e-4).

Source code in opensmc/estimators/eso.py
class ExtendedStateObserver(Estimator):
    """Extended State Observer for 2nd-order systems.

    Parameters
    ----------
    alpha1 : float
        Observer gain 1 (default 6.0).
    alpha2 : float
        Observer gain 2 (default 11.0).
    alpha3 : float
        Observer gain 3 (default 6.0).
    epsilon : float
        Bandwidth parameter; smaller => faster but more peaking (default 0.01).
    b_input : float
        Input gain of the plant (default 1.0).
    dt : float
        Integration time step for Euler update (default 1e-4).
    """

    def __init__(self, alpha1=6.0, alpha2=11.0, alpha3=6.0,
                 epsilon=0.01, b_input=1.0, dt=1e-4):
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.alpha3 = alpha3
        self.epsilon = epsilon
        self.b_input = b_input
        self.dt = dt

        # Internal state: [x_hat1, x_hat2, sigma_hat]
        self._z = np.zeros(3)

    def _dynamics(self, z, y, u, eps):
        """Compute dz/dt for the ESO.

        Parameters
        ----------
        z : ndarray (3,)
            Observer state [x_hat1, x_hat2, sigma_hat].
        y : float
            Measured output (x1).
        u : float
            Control input.
        eps : float
            Current bandwidth parameter.

        Returns
        -------
        zdot : ndarray (3,)
        """
        x_hat1, x_hat2, sigma_hat = z
        e_obs = y - x_hat1  # output estimation error

        x_hat1_dot = x_hat2 + (self.alpha1 / eps) * e_obs
        x_hat2_dot = (self.b_input * u + sigma_hat
                      + (self.alpha2 / eps**2) * e_obs)
        sigma_hat_dot = (self.alpha3 / eps**3) * e_obs

        return np.array([x_hat1_dot, x_hat2_dot, sigma_hat_dot])

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute lumped uncertainty estimate and advance one Euler step.

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray
            Plant state (used for output measurement if y is None).
        u : float or ndarray
            Control input (scalar).
        y : float or None
            Measured output. If None, x[0] is used.
        **kwargs :
            dt : float, optional — override the default time step.
            epsilon : float, optional — override bandwidth parameter.

        Returns
        -------
        dhat : float
            Estimated lumped uncertainty (sigma_hat = f(x) + d).
        """
        dt = kwargs.get('dt', self.dt)
        eps = kwargs.get('epsilon', self.epsilon)

        if y is None:
            y_meas = float(x[0]) if hasattr(x, '__len__') else float(x)
        else:
            y_meas = float(y)

        u_val = float(u) if hasattr(u, '__len__') else float(u)

        # Current estimate (before update)
        dhat = float(self._z[2])

        # Euler integration
        zdot = self._dynamics(self._z, y_meas, u_val, eps)
        self._z = self._z + dt * zdot

        return dhat

    @property
    def state_estimates(self):
        """Return current state estimates [x_hat1, x_hat2, sigma_hat]."""
        return self._z.copy()

    def reset(self):
        """Reset observer state to zero."""
        self._z = np.zeros(3)
Attributes
state_estimates property

Return current state estimates [x_hat1, x_hat2, sigma_hat].

Functions
estimate(t, x, u, y=None, **kwargs)

Compute lumped uncertainty estimate and advance one Euler step.

Parameters

t : float Current time. x : ndarray Plant state (used for output measurement if y is None). u : float or ndarray Control input (scalar). y : float or None Measured output. If None, x[0] is used. **kwargs : dt : float, optional — override the default time step. epsilon : float, optional — override bandwidth parameter.

Returns

dhat : float Estimated lumped uncertainty (sigma_hat = f(x) + d).

Source code in opensmc/estimators/eso.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute lumped uncertainty estimate and advance one Euler step.

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray
        Plant state (used for output measurement if y is None).
    u : float or ndarray
        Control input (scalar).
    y : float or None
        Measured output. If None, x[0] is used.
    **kwargs :
        dt : float, optional — override the default time step.
        epsilon : float, optional — override bandwidth parameter.

    Returns
    -------
    dhat : float
        Estimated lumped uncertainty (sigma_hat = f(x) + d).
    """
    dt = kwargs.get('dt', self.dt)
    eps = kwargs.get('epsilon', self.epsilon)

    if y is None:
        y_meas = float(x[0]) if hasattr(x, '__len__') else float(x)
    else:
        y_meas = float(y)

    u_val = float(u) if hasattr(u, '__len__') else float(u)

    # Current estimate (before update)
    dhat = float(self._z[2])

    # Euler integration
    zdot = self._dynamics(self._z, y_meas, u_val, eps)
    self._z = self._z + dt * zdot

    return dhat
reset()

Reset observer state to zero.

Source code in opensmc/estimators/eso.py
def reset(self):
    """Reset observer state to zero."""
    self._z = np.zeros(3)

HighGainObserver

Bases: Estimator

High-Gain Observer for velocity recovery from position measurement.

Parameters

alpha1 : float Observer gain 1 (default 9.0). alpha2 : float Observer gain 2 (default 6.0). epsilon : float Bandwidth parameter; smaller => faster convergence but more peaking (default 0.01). dt : float Integration time step for Euler update (default 1e-4).

Source code in opensmc/estimators/high_gain.py
class HighGainObserver(Estimator):
    """High-Gain Observer for velocity recovery from position measurement.

    Parameters
    ----------
    alpha1 : float
        Observer gain 1 (default 9.0).
    alpha2 : float
        Observer gain 2 (default 6.0).
    epsilon : float
        Bandwidth parameter; smaller => faster convergence but more
        peaking (default 0.01).
    dt : float
        Integration time step for Euler update (default 1e-4).
    """

    def __init__(self, alpha1=9.0, alpha2=6.0, epsilon=0.01, dt=1e-4):
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.epsilon = epsilon
        self.dt = dt

        # Internal state: [x_hat1, x_hat2]
        self._z = np.zeros(2)

    def _dynamics(self, z, y, eps):
        """Compute dz/dt for the HGO.

        Parameters
        ----------
        z : ndarray (2,)
            Observer state [x_hat1, x_hat2].
        y : float
            Measured output (x1).
        eps : float
            Current bandwidth parameter.

        Returns
        -------
        zdot : ndarray (2,)
        """
        x_hat1, x_hat2 = z
        e1 = x_hat1 - y
        x_hat1_dot = x_hat2 - (self.alpha2 / eps) * e1
        x_hat2_dot = -(self.alpha1 / eps**2) * e1
        return np.array([x_hat1_dot, x_hat2_dot])

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute state estimate and advance one Euler step.

        The HGO recovers velocity from position-only measurement.
        It returns x_hat2 (estimated velocity) as the estimate, since
        the primary purpose is to provide unmeasured state information.
        For disturbance estimation, use ESO or DOB instead.

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray
            Plant state (used for output measurement if y is None).
        u : float or ndarray
            Control input (not used in HGO dynamics, kept for interface).
        y : float or None
            Measured output. If None, x[0] is used.
        **kwargs :
            dt : float, optional — override the default time step.
            epsilon : float, optional — override bandwidth parameter.

        Returns
        -------
        x_hat2 : float
            Estimated velocity (second state).
        """
        dt = kwargs.get('dt', self.dt)
        eps = kwargs.get('epsilon', self.epsilon)

        if y is None:
            y_meas = float(x[0]) if hasattr(x, '__len__') else float(x)
        else:
            y_meas = float(y)

        # Current estimate (before update)
        x_hat2 = float(self._z[1])

        # Euler integration
        zdot = self._dynamics(self._z, y_meas, eps)
        self._z = self._z + dt * zdot

        return x_hat2

    @property
    def state_estimates(self):
        """Return current state estimates [x_hat1, x_hat2]."""
        return self._z.copy()

    def reset(self):
        """Reset observer state to zero."""
        self._z = np.zeros(2)
Attributes
state_estimates property

Return current state estimates [x_hat1, x_hat2].

Functions
estimate(t, x, u, y=None, **kwargs)

Compute state estimate and advance one Euler step.

The HGO recovers velocity from position-only measurement. It returns x_hat2 (estimated velocity) as the estimate, since the primary purpose is to provide unmeasured state information. For disturbance estimation, use ESO or DOB instead.

Parameters

t : float Current time. x : ndarray Plant state (used for output measurement if y is None). u : float or ndarray Control input (not used in HGO dynamics, kept for interface). y : float or None Measured output. If None, x[0] is used. **kwargs : dt : float, optional — override the default time step. epsilon : float, optional — override bandwidth parameter.

Returns

x_hat2 : float Estimated velocity (second state).

Source code in opensmc/estimators/high_gain.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute state estimate and advance one Euler step.

    The HGO recovers velocity from position-only measurement.
    It returns x_hat2 (estimated velocity) as the estimate, since
    the primary purpose is to provide unmeasured state information.
    For disturbance estimation, use ESO or DOB instead.

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray
        Plant state (used for output measurement if y is None).
    u : float or ndarray
        Control input (not used in HGO dynamics, kept for interface).
    y : float or None
        Measured output. If None, x[0] is used.
    **kwargs :
        dt : float, optional — override the default time step.
        epsilon : float, optional — override bandwidth parameter.

    Returns
    -------
    x_hat2 : float
        Estimated velocity (second state).
    """
    dt = kwargs.get('dt', self.dt)
    eps = kwargs.get('epsilon', self.epsilon)

    if y is None:
        y_meas = float(x[0]) if hasattr(x, '__len__') else float(x)
    else:
        y_meas = float(y)

    # Current estimate (before update)
    x_hat2 = float(self._z[1])

    # Euler integration
    zdot = self._dynamics(self._z, y_meas, eps)
    self._z = self._z + dt * zdot

    return x_hat2
reset()

Reset observer state to zero.

Source code in opensmc/estimators/high_gain.py
def reset(self):
    """Reset observer state to zero."""
    self._z = np.zeros(2)

IntegralChainDifferentiator

Bases: Estimator

Integral Chain Differentiator for arbitrary-order derivative estimation.

Parameters

order : int Differentiation order (number of derivatives to estimate). The observer will have order+1 states (default 2). a_coeffs : list or ndarray or None Hurwitz coefficients [a1, a2, ..., a_{n+1}]. If None, uses coefficients of (s+1)^{n+1} (default). epsilon : float Bandwidth parameter; smaller => more accurate but more peaking (default 0.005). dt : float Integration time step for Euler update (default 1e-4).

Source code in opensmc/estimators/integral_chain.py
class IntegralChainDifferentiator(Estimator):
    """Integral Chain Differentiator for arbitrary-order derivative estimation.

    Parameters
    ----------
    order : int
        Differentiation order (number of derivatives to estimate).
        The observer will have order+1 states (default 2).
    a_coeffs : list or ndarray or None
        Hurwitz coefficients [a1, a2, ..., a_{n+1}].
        If None, uses coefficients of (s+1)^{n+1} (default).
    epsilon : float
        Bandwidth parameter; smaller => more accurate but more peaking
        (default 0.005).
    dt : float
        Integration time step for Euler update (default 1e-4).
    """

    def __init__(self, order=2, a_coeffs=None, epsilon=0.005, dt=1e-4):
        self.order = order
        self.epsilon = epsilon
        self.dt = dt

        n_states = order + 1

        if a_coeffs is not None:
            self.a_coeffs = np.asarray(a_coeffs, dtype=float)
        else:
            # Coefficients of (s+1)^{n+1} (binomial coefficients, reversed)
            self.a_coeffs = np.array(
                [float(_binom(n_states, k)) for k in range(n_states)],
                dtype=float
            )

        if len(self.a_coeffs) != n_states:
            raise ValueError(
                f"a_coeffs length {len(self.a_coeffs)} != order+1 = {n_states}"
            )

        # Internal state
        self._z = np.zeros(n_states)

    def _dynamics(self, z, y, eps):
        """Compute dz/dt for the ICD.

        Parameters
        ----------
        z : ndarray (n+1,)
            Observer state vector.
        y : float
            Measured signal.
        eps : float
            Current bandwidth parameter.

        Returns
        -------
        zdot : ndarray (n+1,)
        """
        n_states = len(z)
        zdot = np.zeros(n_states)

        # Chain: z_i_dot = z_{i+1} for i = 0..n-1
        for i in range(n_states - 1):
            zdot[i] = z[i + 1]

        # Last state: correction term
        correction = 0.0
        for j in range(n_states):
            power = n_states - j  # eps^(n+1-j) for 0-indexed j
            if j == 0:
                correction += (self.a_coeffs[j] / eps**power) * (z[0] - y)
            else:
                correction += (self.a_coeffs[j] / eps**power) * z[j]

        zdot[n_states - 1] = -correction

        return zdot

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute derivative estimates and advance one Euler step.

        Returns the last state (highest derivative estimate), which
        corresponds to the lumped uncertainty estimate in a 2nd-order
        plant context.

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray or float
            Plant state (used for signal measurement if y is None).
        u : float or ndarray
            Control input (not used by the ICD, kept for interface).
        y : float or None
            Measured signal. If None, x[0] is used.
        **kwargs :
            dt : float, optional — override the default time step.
            epsilon : float, optional — override bandwidth parameter.

        Returns
        -------
        dhat : float
            Highest derivative estimate (last observer state).
        """
        dt = kwargs.get('dt', self.dt)
        eps = kwargs.get('epsilon', self.epsilon)

        if y is None:
            if hasattr(x, '__len__'):
                y_meas = float(x[0])
            else:
                y_meas = float(x)
        else:
            y_meas = float(y)

        # Current estimate (before update)
        dhat = float(self._z[-1])

        # Euler integration
        zdot = self._dynamics(self._z, y_meas, eps)
        self._z = self._z + dt * zdot

        return dhat

    @property
    def derivatives(self):
        """Return all current derivative estimates [y, y', y'', ...]."""
        return self._z.copy()

    def reset(self):
        """Reset observer state to zero."""
        self._z = np.zeros(self.order + 1)
Attributes
derivatives property

Return all current derivative estimates [y, y', y'', ...].

Functions
estimate(t, x, u, y=None, **kwargs)

Compute derivative estimates and advance one Euler step.

Returns the last state (highest derivative estimate), which corresponds to the lumped uncertainty estimate in a 2nd-order plant context.

Parameters

t : float Current time. x : ndarray or float Plant state (used for signal measurement if y is None). u : float or ndarray Control input (not used by the ICD, kept for interface). y : float or None Measured signal. If None, x[0] is used. **kwargs : dt : float, optional — override the default time step. epsilon : float, optional — override bandwidth parameter.

Returns

dhat : float Highest derivative estimate (last observer state).

Source code in opensmc/estimators/integral_chain.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute derivative estimates and advance one Euler step.

    Returns the last state (highest derivative estimate), which
    corresponds to the lumped uncertainty estimate in a 2nd-order
    plant context.

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray or float
        Plant state (used for signal measurement if y is None).
    u : float or ndarray
        Control input (not used by the ICD, kept for interface).
    y : float or None
        Measured signal. If None, x[0] is used.
    **kwargs :
        dt : float, optional — override the default time step.
        epsilon : float, optional — override bandwidth parameter.

    Returns
    -------
    dhat : float
        Highest derivative estimate (last observer state).
    """
    dt = kwargs.get('dt', self.dt)
    eps = kwargs.get('epsilon', self.epsilon)

    if y is None:
        if hasattr(x, '__len__'):
            y_meas = float(x[0])
        else:
            y_meas = float(x)
    else:
        y_meas = float(y)

    # Current estimate (before update)
    dhat = float(self._z[-1])

    # Euler integration
    zdot = self._dynamics(self._z, y_meas, eps)
    self._z = self._z + dt * zdot

    return dhat
reset()

Reset observer state to zero.

Source code in opensmc/estimators/integral_chain.py
def reset(self):
    """Reset observer state to zero."""
    self._z = np.zeros(self.order + 1)

LevantDifferentiator

Bases: Estimator

Arbitrary-order robust exact differentiator (Levant 2003).

For order n, maintains n+1 states z[0]..z[n] such that z[k] -> f^{(k)} in finite time.

Parameters

order : int Differentiation order (number of derivatives to estimate). Default 1. L : float Lipschitz constant bound on |f^{(n+1)}|. Default 10.0. lambdas : list or ndarray or None Gain vector of length n+1. If None, uses Levant 2003 Table 1 defaults (or linearly spaced gains for orders > 4). dt : float Integration time step for Euler update (default 1e-4).

Source code in opensmc/estimators/levant.py
class LevantDifferentiator(Estimator):
    """Arbitrary-order robust exact differentiator (Levant 2003).

    For order n, maintains n+1 states z[0]..z[n] such that
    z[k] -> f^{(k)} in finite time.

    Parameters
    ----------
    order : int
        Differentiation order (number of derivatives to estimate).
        Default 1.
    L : float
        Lipschitz constant bound on |f^{(n+1)}|. Default 10.0.
    lambdas : list or ndarray or None
        Gain vector of length n+1. If None, uses Levant 2003 Table 1
        defaults (or linearly spaced gains for orders > 4).
    dt : float
        Integration time step for Euler update (default 1e-4).
    """

    def __init__(self, order=1, L=10.0, lambdas=None, dt=1e-4):
        self.order = order
        self.L = L
        self.dt = dt
        n = order

        if lambdas is not None:
            self.lambdas = np.asarray(lambdas, dtype=float)
        elif n in _DEFAULT_LAMBDAS:
            self.lambdas = np.array(_DEFAULT_LAMBDAS[n], dtype=float)
        else:
            self.lambdas = np.linspace(n + 1, 1.1, n + 1)

        if len(self.lambdas) != n + 1:
            raise ValueError(
                f"lambdas length {len(self.lambdas)} != order+1 = {n + 1}"
            )

        # Internal state: z[0..n]
        self._z = np.zeros(n + 1)

    def _step_euler(self, f, dt_step):
        """Advance one Euler step given measured signal f.

        Parameters
        ----------
        f : float
            Current measurement of the signal.
        dt_step : float
            Time step.
        """
        n = self.order
        L = self.L
        lam = self.lambdas
        z = self._z

        zdot = np.zeros(n + 1)
        sigma = z[0] - f  # phi_0 = f

        for k in range(n + 1):
            if k < n:
                alpha_k = (n - k) / (n - k + 1)
                zdot[k] = (-lam[k] * L ** (1.0 / (n - k + 1))
                           * np.abs(sigma) ** alpha_k * np.sign(sigma)
                           + z[k + 1])
            else:
                # Last stage: pure sign correction
                zdot[k] = -lam[k] * L * np.sign(sigma)

            # sigma for next stage: z[k+1] - zdot[k]
            if k < n:
                sigma = z[k + 1] - zdot[k]

        self._z = z + dt_step * zdot

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute derivative estimates and advance one Euler step.

        Returns the last state (highest derivative estimate).

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray or float
            Plant state (used for signal measurement if y is None).
        u : float or ndarray
            Control input (not used by the differentiator, kept for
            interface compatibility).
        y : float or None
            Measured signal. If None, x[0] is used.
        **kwargs :
            dt : float, optional — override the default time step.

        Returns
        -------
        dhat : float or ndarray
            Highest derivative estimate (z[n]). If order == 1, returns
            scalar. For higher orders, still returns the highest-order
            derivative as a scalar.
        """
        dt = kwargs.get('dt', self.dt)

        if y is None:
            if hasattr(x, '__len__'):
                f = float(x[0])
            else:
                f = float(x)
        else:
            f = float(y)

        # Current estimate (before update)
        dhat = float(self._z[-1])

        # Euler integration
        self._step_euler(f, dt)

        return dhat

    @property
    def derivatives(self):
        """Return all current derivative estimates [f, f', f'', ...]."""
        return self._z.copy()

    def reset(self):
        """Reset differentiator state to zero."""
        self._z = np.zeros(self.order + 1)
Attributes
derivatives property

Return all current derivative estimates [f, f', f'', ...].

Functions
estimate(t, x, u, y=None, **kwargs)

Compute derivative estimates and advance one Euler step.

Returns the last state (highest derivative estimate).

Parameters

t : float Current time. x : ndarray or float Plant state (used for signal measurement if y is None). u : float or ndarray Control input (not used by the differentiator, kept for interface compatibility). y : float or None Measured signal. If None, x[0] is used. **kwargs : dt : float, optional — override the default time step.

Returns

dhat : float or ndarray Highest derivative estimate (z[n]). If order == 1, returns scalar. For higher orders, still returns the highest-order derivative as a scalar.

Source code in opensmc/estimators/levant.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute derivative estimates and advance one Euler step.

    Returns the last state (highest derivative estimate).

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray or float
        Plant state (used for signal measurement if y is None).
    u : float or ndarray
        Control input (not used by the differentiator, kept for
        interface compatibility).
    y : float or None
        Measured signal. If None, x[0] is used.
    **kwargs :
        dt : float, optional — override the default time step.

    Returns
    -------
    dhat : float or ndarray
        Highest derivative estimate (z[n]). If order == 1, returns
        scalar. For higher orders, still returns the highest-order
        derivative as a scalar.
    """
    dt = kwargs.get('dt', self.dt)

    if y is None:
        if hasattr(x, '__len__'):
            f = float(x[0])
        else:
            f = float(x)
    else:
        f = float(y)

    # Current estimate (before update)
    dhat = float(self._z[-1])

    # Euler integration
    self._step_euler(f, dt)

    return dhat
reset()

Reset differentiator state to zero.

Source code in opensmc/estimators/levant.py
def reset(self):
    """Reset differentiator state to zero."""
    self._z = np.zeros(self.order + 1)

NoEstimator

Bases: Estimator

Null estimator — returns zero. Used when no estimation is needed.

Source code in opensmc/core.py
class NoEstimator(Estimator):
    """Null estimator — returns zero. Used when no estimation is needed."""

    def estimate(self, t, x, u, y=None, **kwargs):
        return 0.0

RBF_ELM

Bases: Estimator

RBF network with OS-ELM online learning for disturbance estimation.

Parameters

n_input : int Input dimension (default 2). n_hidden : int Number of RBF hidden neurons / centers (default 50). x_min : array-like Lower bounds for center placement domain (default [-3, -5]). x_max : array-like Upper bounds for center placement domain (default [3, 5]). lam : float Regularization parameter for initial P matrix: P_0 = I / lam (default 1e-4). k_nn : int Number of nearest neighbors for width computation (default 3). seed : int or None Random seed for reproducible center placement (default None). dt : float Not used directly (kept for interface consistency). Default 1e-3.

Source code in opensmc/estimators/rbf_elm.py
class RBF_ELM(Estimator):
    """RBF network with OS-ELM online learning for disturbance estimation.

    Parameters
    ----------
    n_input : int
        Input dimension (default 2).
    n_hidden : int
        Number of RBF hidden neurons / centers (default 50).
    x_min : array-like
        Lower bounds for center placement domain (default [-3, -5]).
    x_max : array-like
        Upper bounds for center placement domain (default [3, 5]).
    lam : float
        Regularization parameter for initial P matrix:
        P_0 = I / lam (default 1e-4).
    k_nn : int
        Number of nearest neighbors for width computation (default 3).
    seed : int or None
        Random seed for reproducible center placement (default None).
    dt : float
        Not used directly (kept for interface consistency). Default 1e-3.
    """

    def __init__(self, n_input=2, n_hidden=50, x_min=None, x_max=None,
                 lam=1e-4, k_nn=3, seed=None, dt=1e-3):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.lam = lam
        self.k_nn = k_nn
        self.dt = dt

        if x_min is None:
            x_min = -np.ones(n_input) * 3.0
        if x_max is None:
            x_max = np.ones(n_input) * 3.0

        self.x_min = np.asarray(x_min, dtype=float)
        self.x_max = np.asarray(x_max, dtype=float)

        if len(self.x_min) != n_input or len(self.x_max) != n_input:
            raise ValueError(
                f"x_min/x_max length must match n_input={n_input}"
            )

        # Initialize random state
        self._rng = np.random.RandomState(seed)

        # Place centers via Latin Hypercube Sampling
        self.centers = self._lhs_centers()

        # Compute widths via mean of k nearest neighbor distances
        self.widths = self._compute_widths()

        # Output weights: W_out (n_hidden,)
        self.W_out = np.zeros(n_hidden)

        # Inverse correlation matrix: P (n_hidden x n_hidden)
        self.P = np.eye(n_hidden) / lam

    def _lhs_centers(self):
        """Latin Hypercube Sampling for center placement."""
        centers = np.zeros((self.n_hidden, self.n_input))
        for d in range(self.n_input):
            perm = self._rng.permutation(self.n_hidden)
            for i in range(self.n_hidden):
                centers[i, d] = (perm[i] + self._rng.rand()) / self.n_hidden
            # Scale to [x_min, x_max]
            centers[:, d] = (self.x_min[d]
                             + centers[:, d] * (self.x_max[d] - self.x_min[d]))
        return centers

    def _compute_widths(self):
        """Compute RBF widths as mean of k nearest neighbor distances."""
        # Pairwise squared distances
        diff = self.centers[:, np.newaxis, :] - self.centers[np.newaxis, :, :]
        D = np.sqrt(np.sum(diff**2, axis=2))  # (L, L)

        widths = np.zeros(self.n_hidden)
        for j in range(self.n_hidden):
            dists = np.sort(D[j])
            # Skip index 0 (distance to self = 0), take next k_nn
            k_dists = dists[1:self.k_nn + 1]
            widths[j] = np.mean(k_dists)

        # Safety: ensure no zero widths
        widths = np.maximum(widths, 1e-6)
        return widths

    def _rbf_activation(self, x):
        """Compute RBF hidden layer output h(x).

        h_j(x) = exp(-||x - c_j||^2 / (2 * w_j^2))

        Parameters
        ----------
        x : ndarray (n_input,)
            Input vector.

        Returns
        -------
        h : ndarray (n_hidden,)
            Hidden layer activations.
        """
        x = np.asarray(x, dtype=float)
        diff = x - self.centers  # (n_hidden, n_input)
        sq_dists = np.sum(diff**2, axis=1)  # (n_hidden,)
        h = np.exp(-sq_dists / (2.0 * self.widths**2))
        return h

    def forward(self, x):
        """Forward pass: d_hat = W_out^T * h(x).

        Parameters
        ----------
        x : ndarray (n_input,)
            Input feature vector.

        Returns
        -------
        d_hat : float
            Network output (disturbance estimate).
        """
        h = self._rbf_activation(x)
        return float(np.dot(self.W_out, h))

    def online_update(self, x, target):
        """OS-ELM update via Woodbury identity (Liang 2006).

        Parameters
        ----------
        x : ndarray (n_input,)
            Input feature vector.
        target : float
            Target value (reconstructed disturbance).

        Returns
        -------
        d_hat : float
            Prediction BEFORE the weight update.
        """
        h = self._rbf_activation(x)        # (L,)
        d_hat = float(np.dot(self.W_out, h))  # prediction before update

        # Woodbury update
        Ph = self.P @ h                     # (L,)
        denom = 1.0 + h @ Ph                # scalar
        self.P = self.P - np.outer(Ph, Ph) / denom

        e = target - d_hat                  # prediction error
        self.W_out = self.W_out + self.P @ h * e

        return d_hat

    def estimate(self, t, x, u, y=None, **kwargs):
        """Compute disturbance estimate from state features.

        Uses the RBF-ELM network to estimate d_hat from the plant
        state x (used as input features). If a target is provided
        via kwargs, an online update is performed.

        Parameters
        ----------
        t : float
            Current time.
        x : ndarray
            Plant state (used as input features for the network).
        u : float or ndarray
            Control input (not used directly by the network).
        y : float or None
            Measured output (not used directly).
        **kwargs :
            target : float, optional — if provided, perform an online
                update with this target value after computing the estimate.

        Returns
        -------
        dhat : float
            Estimated disturbance.
        """
        x_feat = np.asarray(x, dtype=float).ravel()

        # If the input dimension doesn't match, try to use the first
        # n_input elements
        if len(x_feat) > self.n_input:
            x_feat = x_feat[:self.n_input]

        target = kwargs.get('target', None)
        if target is not None:
            dhat = self.online_update(x_feat, float(target))
        else:
            dhat = self.forward(x_feat)

        return dhat

    def reset(self):
        """Reset output weights and P matrix (keep centers and widths)."""
        self.W_out = np.zeros(self.n_hidden)
        self.P = np.eye(self.n_hidden) / self.lam
Functions
estimate(t, x, u, y=None, **kwargs)

Compute disturbance estimate from state features.

Uses the RBF-ELM network to estimate d_hat from the plant state x (used as input features). If a target is provided via kwargs, an online update is performed.

Parameters

t : float Current time. x : ndarray Plant state (used as input features for the network). u : float or ndarray Control input (not used directly by the network). y : float or None Measured output (not used directly). **kwargs : target : float, optional — if provided, perform an online update with this target value after computing the estimate.

Returns

dhat : float Estimated disturbance.

Source code in opensmc/estimators/rbf_elm.py
def estimate(self, t, x, u, y=None, **kwargs):
    """Compute disturbance estimate from state features.

    Uses the RBF-ELM network to estimate d_hat from the plant
    state x (used as input features). If a target is provided
    via kwargs, an online update is performed.

    Parameters
    ----------
    t : float
        Current time.
    x : ndarray
        Plant state (used as input features for the network).
    u : float or ndarray
        Control input (not used directly by the network).
    y : float or None
        Measured output (not used directly).
    **kwargs :
        target : float, optional — if provided, perform an online
            update with this target value after computing the estimate.

    Returns
    -------
    dhat : float
        Estimated disturbance.
    """
    x_feat = np.asarray(x, dtype=float).ravel()

    # If the input dimension doesn't match, try to use the first
    # n_input elements
    if len(x_feat) > self.n_input:
        x_feat = x_feat[:self.n_input]

    target = kwargs.get('target', None)
    if target is not None:
        dhat = self.online_update(x_feat, float(target))
    else:
        dhat = self.forward(x_feat)

    return dhat
forward(x)

Forward pass: d_hat = W_out^T * h(x).

Parameters

x : ndarray (n_input,) Input feature vector.

Returns

d_hat : float Network output (disturbance estimate).

Source code in opensmc/estimators/rbf_elm.py
def forward(self, x):
    """Forward pass: d_hat = W_out^T * h(x).

    Parameters
    ----------
    x : ndarray (n_input,)
        Input feature vector.

    Returns
    -------
    d_hat : float
        Network output (disturbance estimate).
    """
    h = self._rbf_activation(x)
    return float(np.dot(self.W_out, h))
online_update(x, target)

OS-ELM update via Woodbury identity (Liang 2006).

Parameters

x : ndarray (n_input,) Input feature vector. target : float Target value (reconstructed disturbance).

Returns

d_hat : float Prediction BEFORE the weight update.

Source code in opensmc/estimators/rbf_elm.py
def online_update(self, x, target):
    """OS-ELM update via Woodbury identity (Liang 2006).

    Parameters
    ----------
    x : ndarray (n_input,)
        Input feature vector.
    target : float
        Target value (reconstructed disturbance).

    Returns
    -------
    d_hat : float
        Prediction BEFORE the weight update.
    """
    h = self._rbf_activation(x)        # (L,)
    d_hat = float(np.dot(self.W_out, h))  # prediction before update

    # Woodbury update
    Ph = self.P @ h                     # (L,)
    denom = 1.0 + h @ Ph                # scalar
    self.P = self.P - np.outer(Ph, Ph) / denom

    e = target - d_hat                  # prediction error
    self.W_out = self.W_out + self.P @ h * e

    return d_hat
reset()

Reset output weights and P matrix (keep centers and widths).

Source code in opensmc/estimators/rbf_elm.py
def reset(self):
    """Reset output weights and P matrix (keep centers and widths)."""
    self.W_out = np.zeros(self.n_hidden)
    self.P = np.eye(self.n_hidden) / self.lam