cooper
cooper copied to clipboard
A tutorial example for the generation of time series data
Enhancement
A tutorial example for the generation of time series data w.r.t. to constrains on the values.
An open question to me is if the inequalities below should result in a scalar ineq_defect
tensor, i.e., so I need to sum up the defects per time step, or if it is fine the way it is.
Motivation
I want to use cooper to create experiments, e.g. for system identification. In order to ensure that the commanded inputs do not destroy the device, I want the resulting trajectories to obey some device-dependent constraints. I am interested how non-differentiable constraints can be added in this context (yes, I saw the classification demo but am still a bit confused).
Alternatives
There are for sure plenty, since I just started playing with cooper 1h ago :)
Suggested Starting Point
I know this code is unpolished, but I first wanted to know what you think of this idea before I put work into it.
from typing import Callable
import cooper
import matplotlib.pyplot as plt
import torch
def generate_traj(time: torch.Tensor, coeffs: torch.Tensor) -> torch.Tensor:
""" Helper function to generate a 5th order polynomial. """
return (
coeffs[0]
+ coeffs[1] * time
+ coeffs[2] * time ** 2
+ coeffs[3] * time ** 3
+ coeffs[4] * time ** 4
+ coeffs[5] * time ** 5
)
class MinDeviation(cooper.ConstrainedMinimizationProblem):
"""
Generate a trajectory which has the minimal L2 distance from a reference trajectory, which satisfying
an (arbitrarily chosen) inequality constraint on its values.
"""
def __init__(
self,
traj_ref: torch.Tensor,
time_ref: torch.Tensor,
traj_gen_fcn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor],
):
# I know this class is far from elegant, but let's keep it for now.
super().__init__(is_constrained=True)
self.traj_ref = traj_ref.detach().clone()
self.time_ref = time_ref.detach().clone()
self.traj_gen_fcn = traj_gen_fcn
self.loss_fcn = torch.nn.MSELoss()
def closure(self, coeffs: torch.Tensor) -> cooper.CMPState:
# Generate the trajectory which we want to constrain.
traj = self.traj_gen_fcn(self.time_ref, coeffs)
# Evaluate the cost function.
loss = self.loss_fcn(self.traj_ref, traj)
# Evaluate the equality constraints g(x) = 0, i.e., the violation thereof.
eq_defect = None
# Evaluate the inequality constraints h(x) <= 0, i.e., the violation thereof.
ineq_defect = traj - torch.tensor([7.0]) # all values should be less or equal than 7
return cooper.CMPState(loss, ineq_defect, eq_defect)
if __name__ == "__main__":
# Configure the experimental design.
time = torch.linspace(0, 2, 51)
coeffs = torch.tensor([4, -3, 3, 2, 0.5, -1])
fig, axs = plt.subplots(1, 1, figsize=(12, 8))
traj_unconstr = generate_traj(time, coeffs)
axs.plot(time.numpy(), traj_unconstr.numpy())
# Define the problem and its Lagrangian formulation.
cmp = MinDeviation(traj_ref=traj_unconstr, time_ref=time, traj_gen_fcn=generate_traj)
formulation = cooper.LagrangianFormulation(cmp)
# Define the primal parameters and optimizer.
coeffs_param = torch.nn.Parameter(coeffs.clone()) # start with the param values of the unconstrained traj
primal_optimizer = cooper.optim.ExtraSGD([coeffs_param], lr=3e-3, momentum=0.7)
# Define the dual optimizer. Note that this optimizer has NOT been fully instantiated
# yet. Cooper takes care of this, once it has initialized the formulation state.
dual_optimizer = cooper.optim.partial_optimizer(cooper.optim.ExtraSGD, lr=9e-4, momentum=0.7)
# Wrap the formulation and both optimizers inside a ConstrainedOptimizer
coop = cooper.ConstrainedOptimizer(formulation, primal_optimizer, dual_optimizer)
# Here is the actual training loop.
# The steps follow closely the `loss -> backward -> step` Pytorch workflow.
for iter in range(3000):
coop.zero_grad()
lagrangian = formulation.composite_objective(cmp.closure, coeffs_param)
formulation.custom_backward(lagrangian)
coop.step(cmp.closure, coeffs_param)
assert not torch.any(torch.isnan(coeffs_param)), f"Observed the first NaN value at iter {iter}!"
# Get trajectory with the final coefficients.
with torch.no_grad():
traj_constr = generate_traj(time, coeffs_param)
axs.plot(time.numpy(), traj_constr.numpy(), ls="--")
plt.show()