-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathnonlinear_data_driven_mpc_controller.py
More file actions
1524 lines (1292 loc) · 58.7 KB
/
Copy pathnonlinear_data_driven_mpc_controller.py
File metadata and controls
1524 lines (1292 loc) · 58.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from enum import Enum
import cvxpy as cp
import numpy as np
from direct_data_driven_mpc.utilities import (
evaluate_persistent_excitation,
hankel_matrix,
)
# Define the regularization types of `alpha`, considering
# with respect to what variable it is regularized
class AlphaRegType(Enum):
"""
Regularization types for the `alpha` variable used in the formulation of
Data-Driven MPC controllers for nonlinear systems.
Attributes:
APPROXIMATED: Regularizes `alpha` with respect to an approximation of
`alpha_Lin^sr(D_t)`. Based on Remark 1 of [2].
PREVIOUS: Regularizes `alpha` with respect to the previous optimal
alpha value to encourage stationary behavior. Refer to Section V of
[2].
ZERO: Regularizes `alpha` with respect to zero.
References:
[2] J. Berberich, J. Köhler, M. A. Müller and F. Allgöwer, "Linear
Tracking MPC for Nonlinear Systems—Part II: The Data-Driven Case," in
IEEE Transactions on Automatic Control, vol. 67, no. 9, pp. 4406-4421,
Sept. 2022, doi: 10.1109/TAC.2022.3166851.
"""
APPROXIMATED = 0
PREVIOUS = 1
ZERO = 2
class NonlinearDataDrivenMPCController:
"""
A class that implements a Data-Driven Model Predictive Control (MPC)
controller for nonlinear systems. The implementation is based on research
by J. Berberich et al., as described in [2].
References:
[2] J. Berberich, J. Köhler, M. A. Müller and F. Allgöwer, "Linear
Tracking MPC for Nonlinear Systems—Part II: The Data-Driven Case," in
IEEE Transactions on Automatic Control, vol. 67, no. 9, pp. 4406-4421,
Sept. 2022, doi: 10.1109/TAC.2022.3166851.
"""
n: int
"""The estimated order of the system."""
m: int
"""The number of control inputs."""
p: int
"""The number of system outputs."""
u: np.ndarray
"""The persistently exciting input trajectory applied to the system."""
y: np.ndarray
"""The system's output response to `u`."""
du: np.ndarray
"""
The input increment trajectory for a controller that uses input increments
(`ext_out_incr_in = True`).
"""
N: int
"""The length of the initial input (`u`) and output (`y`) trajectories."""
L: int
"""The prediction horizon length."""
Q: np.ndarray
"""The output weighting matrix."""
R: np.ndarray
"""The input weighting matrix."""
S: np.ndarray
"""The output setpoint weighting matrix."""
y_r: np.ndarray
"""The system output setpoint."""
lamb_alpha: float
"""The ridge regularization weight for `alpha`."""
lamb_sigma: float
"""The ridge regularization weight for `sigma`."""
U: np.ndarray
"""
An array of shape (`m`, 2) containing the bounds for the `m` predicted
inputs. Each row specifies the `[min, max]` bounds for a single input.
"""
Us: np.ndarray
"""
An array of shape (`m`, 2) containing the bounds for the `m` predicted
input setpoints. `Us` must be a subset of `U`. Each row specifies the
`[min, max]` bounds for a single input.
"""
alpha_reg_type: AlphaRegType
"""
The alpha regularization type for the Nonlinear Data-Driven MPC
formulation.
"""
lamb_alpha_s: float
"""
The ridge regularization weight for `alpha_s` for a controller that uses an
approximation of `alpha_Lin^sr(D_t)` for the regularization of `alpha`.
"""
lamb_sigma_s: float
"""
The ridge regularization weight for `sigma_s` for a controller that uses an
approximation of `alpha_Lin^sr(D_t)` for the regularization of `alpha`.
"""
ext_out_incr_in: bool
"""
The controller structure:
- If `True`, the controller uses an extended output representation
(y_ext[k] = [y[k], u[k]]) and input increments (u[k] = u[k-1] + du[k-1]).
- If `False`, the controller operates as a standard controller with direct
control inputs and without system state extensions.
Defaults to `False`.
"""
update_cost_threshold: float
"""
The tracking cost value threshold. Online input-output data updates are
disabled when the tracking cost value is less than this value.
"""
n_mpc_step: int
"""
The number of consecutive applications of the optimal input for an n-Step
Data-Driven MPC Scheme (multi-step).
"""
def __init__(
self,
n: int,
m: int,
p: int,
u: np.ndarray,
y: np.ndarray,
L: int,
Q: np.ndarray,
R: np.ndarray,
S: np.ndarray,
y_r: np.ndarray,
lamb_alpha: float,
lamb_sigma: float,
U: np.ndarray,
Us: np.ndarray,
alpha_reg_type: AlphaRegType = AlphaRegType.ZERO,
lamb_alpha_s: float | None = None,
lamb_sigma_s: float | None = None,
ext_out_incr_in: bool = False,
update_cost_threshold: float | None = None,
n_mpc_step: int = 1,
):
"""
Initialize a Direct Nonlinear Data-Driven MPC with specified system
model parameters, an initial input-output data trajectory measured
from the system, and Nonlinear Data-Driven MPC parameters.
Note:
The input data `u` used to excite the system to get the initial
output data must be persistently exciting of order (L + n + 1), as
defined in the Data-Driven MPC formulation in [2].
Args:
n (int): The estimated order of the system.
m (int): The number of control inputs.
p (int): The number of system outputs.
u (np.ndarray): A persistently exciting input sequence.
y (np.ndarray): The system's output response to `u`.
L (int): The prediction horizon length.
Q (np.ndarray): The output weighting matrix for the MPC
formulation.
R (np.ndarray): The input weighting matrix for the MPC
formulation.
S (np.ndarray): The output setpoint weighting matrix for the MPC
formulation.
y_r (np.ndarray): The system output setpoint.
lamb_alpha (float): The ridge regularization weight for
`alpha`.
lamb_sigma (float): The ridge regularization weight for
`sigma`.
U (np.ndarray): An array of shape (`m`, 2) containing the bounds
for the `m` predicted inputs. Each row specifies the
`[min, max]` bounds for a single input.
Us (np.ndarray): An array of shape (`m`, 2) containing the bounds
for the `m` predicted input setpoints. `Us` must be a subset
of `U`. Each row specifies the `[min, max]` bounds for a
single input.
alpha_reg_type (AlphaRegType): The `alpha` regularization type
for the Nonlinear Data-Driven MPC formulation.
lamb_alpha_s (float | None): The ridge regularization weight for
`alpha_s` for a controller that uses an approximation of
`alpha_Lin^sr(D_t)` for the regularization of `alpha`.
lamb_sigma_s (float | None): The ridge regularization weight for
`sigma_s` for a controller that uses an approximation of
`alpha_Lin^sr(D_t)` for the regularization of `alpha`.
ext_out_incr_in (bool): The controller structure:
- If `True`, the controller uses an extended output
representation (y_ext[k] = [y[k], u[k]]) and input increments
(u[k] = u[k-1] + du[k-1]).
- If `False`, the controller operates as a standard controller
with direct control inputs and without system state
extensions.
Defaults to `False`.
update_cost_threshold (float | None): The tracking cost value
threshold. Online input-output data updates are disabled when
the tracking cost value is less than this value. If `None`,
input-output data is always updated online. Defaults to
`None`.
n_mpc_step (int): The number of consecutive applications of the
optimal input for an n-Step Data-Driven MPC Scheme
(multi-step). Defaults to 1.
References:
[2] J. Berberich, J. Köhler, M. A. Müller and F. Allgöwer, "Linear
Tracking MPC for Nonlinear Systems—Part II: The Data-Driven Case,"
in IEEE Transactions on Automatic Control, vol. 67, no. 9, pp.
4406-4421, Sept. 2022, doi: 10.1109/TAC.2022.3166851.
"""
# Define controller structure:
# - If `True`: The controller uses an extended output representation
# (y_ext[k] = [y[k], u[k]]) and input increments where
# the control input is updated incrementally as:
# u[k] = u[k-1] + du[k-1]. Based on Section V of [2].
# - If `False`: Standard controller operation. The controller uses
# direct control inputs without system state extensions.
self.ext_out_incr_in = ext_out_incr_in
# Define system model
self.n = n # Estimated system order
self.m = m # Number of inputs
self.p = p # Number of outputs
# Input-Output trajectory data
self.u = u.copy() # Input trajectory data
if self.ext_out_incr_in:
# Incremental Input trajectory data
# du[k] = u[k+1] - u[k]
du_last = np.zeros((self.m))
self.du = np.vstack([u[1:, :] - u[:-1, :], [du_last]])
# Output trajectory data
if self.ext_out_incr_in:
# Extended Output trajectory data
self.y = np.hstack([y, u])
else:
self.y = y.copy()
self.N = u.shape[0] # Initial input-output trajectory length
# Define Nonlinear Data-Driven MPC parameters
self.L = L # Prediction horizon
self.Q = Q # Output weighting matrix
self.R = R # Input weighting matrix
self.S = S # Output setpoint weighting matrix
self.y_r = y_r # System output setpoint
self.lamb_alpha = lamb_alpha # Ridge regularization weight for alpha
self.lamb_sigma = lamb_sigma # Ridge regularization weight for sigma
self.U = U # Bounds for the predicted input
self.Us = Us # Bounds for the predicted input setpoint
# Note: Us must be a subset of U.
# Alpha regularization type
self.alpha_reg_type = alpha_reg_type
# Parameters for the approximation of alpha_Lin^sr(D_t).
# Alpha is regularized w.r.t. this parameter, based on Remark 1
# of [2].
if alpha_reg_type == AlphaRegType.APPROXIMATED:
# Ridge regularization weight for alpha_s
self.lamb_alpha_s = lamb_alpha_s # type: ignore[assignment]
# Ridge regularization weight for sigma_s
self.lamb_sigma_s = lamb_sigma_s # type: ignore[assignment]
elif alpha_reg_type == AlphaRegType.PREVIOUS:
# Previous alpha value initialized with 0
self._prev_alpha_val = np.zeros((self.N - self.L - self.n, 1))
# Online input-output data updates
self.update_cost_threshold = (
update_cost_threshold if update_cost_threshold is not None else 0.0
)
self._update_data = True
# n-Step Data-Driven MPC Scheme parameters
self.n_mpc_step = n_mpc_step # Number of consecutive applications
# of the optimal input
# Define bounds for the predicted inputs and predicted input setpoints
self._U_const_low = np.tile(self.U[:, 0:1], (self.L + 1, 1))
self._U_const_up = np.tile(self.U[:, 1:2], (self.L + 1, 1))
self._Us_const_low = self.Us[:, 0:1]
self._Us_const_up = self.Us[:, 1:2]
# Define helper constants for MPC constraints definition
self._ones_1 = np.ones((1, 1)) # 1
self._ones_NLn = np.ones((1, self.N - self.L - self.n)) # 1_(N-L-n)^T
self._ones_Ln1 = np.ones((self.L + self.n + 1, 1)) # 1_(L+n+1)
self._ones_n1 = np.ones((n + 1, 1)) # 1_(n+1)
# Evaluate if input trajectory data is persistently exciting of
# order (L + n + 1)
self._evaluate_input_persistent_excitation()
# Check correct prediction horizon length and cost matrix dimensions
self._check_prediction_horizon_length()
self._check_weighting_matrices_dimensions()
# Initialize Data-Driven MPC controller
self._initialize_data_driven_mpc()
"""
Public methods
"""
def update_and_solve_data_driven_mpc(
self, warm_start: bool = False
) -> None:
"""
Update the Data-Driven MPC optimization parameters, solve the problem,
and store the optimal control input.
This method performs the following tasks:
1. Constructs Hankel matrices using the latest measured input-output
data. If the tracking cost value from the previous solution is
small enough (less than `update_cost_threshold`), omits this step
and the previously defined matrices are used.
2. Updates the MPC optimization parameters to use the latest
input-output measurements. Additionally, it updates the value of
`alpha_Lin^sr(D_t)` if `alpha` is not regularized with respect to
zero.
Note: The value of `alpha_Lin^sr(D_t)` is computed during the
optimization parameter update.
3. Solves the MPC problem and stores the resulting optimal control
input.
4. Stores the optimal value of `alpha` if `alpha` is regularized
with respect to its previous optimal value (see Section V of [2]).
5. Toggles online data updates based on the current tracking cost
value.
References:
[2]: See class-level docstring for full reference details.
"""
# Update data-driven constants online if data updates are enabled
# (current tracking cost value is not small enough)
if self._update_data:
if self.ext_out_incr_in:
# For a controller that uses an extended output representation
# and input increments, the input Hankel matrix corresponds to
# input increments instead of absolute inputs.
# H_{L+n+1}(du)
self._HLn1_u = hankel_matrix(self.du, self.L + self.n + 1)
else:
# H_{L+n+1}(u)
self._HLn1_u = hankel_matrix(self.u, self.L + self.n + 1)
# H_{L+n+1}(y)
self._HLn1_y = hankel_matrix(self.y, self.L + self.n + 1)
# Update MPC optimization parameters
#
# Note: The value of `alpha_Lin^sr(D_t)` is computed
# during the optimization parameter update.
self._update_optimization_parameters()
# Solve MPC problem and store the optimal input
self._solve_mpc_problem(warm_start=warm_start)
self._get_optimal_control_input()
# Update previous alpha value if the alpha
# regularization type is `PREVIOUS`
if self.alpha_reg_type == AlphaRegType.PREVIOUS:
self._store_previous_alpha_value()
# Toggle online data updates based on the tracking cost value
if self._tracking_cost.value <= self.update_cost_threshold:
self._update_data = False
else:
self._update_data = True
def get_problem_solve_status(self) -> str:
"""
Get the solve status of the optimization problem of the Data-Driven MPC
formulation.
Returns:
str: The status of the optimization problem after attempting to
solve it (e.g., "optimal", "optimal_inaccurate", "infeasible",
"unbounded").
"""
return self._problem.status
def get_optimal_cost_value(self) -> float:
"""
Get the cost value corresponding to the solved optimization problem of
the Data-Driven MPC formulation.
Returns:
float: The optimal cost value of the solved MPC optimization
problem.
"""
return self._problem.value
def get_optimal_control_input_at_step(self, n_step: int = 0) -> np.ndarray:
"""
Get the optimal control input (`u`) from the MPC solution
corresponding to a specified time step in the prediction horizon
[0, L].
Args:
n_step (int): The time step of the optimal control input to
retrieve. It must be within the range [0, L].
Returns:
np.ndarray: An array containing the optimal control input for the
specified prediction time step.
Note:
This method assumes that the optimal control input from the MPC
solution has been stored in the `optimal_u` attribute. For a
controller that uses an extended output representation and input
increments, the last `du` value should contain the optimal control
input increment computed from the previous MPC solution
(`optimal_du[:m]`).
Raises:
ValueError: If `n_step` is not within the range [0, L].
"""
# Ensure n_step is within prediction range [0,L]
if not 0 <= n_step <= self.L:
raise ValueError(
f"The specified prediction time step ({n_step}) is out of "
f"range. It should be within [0, {self.L}]."
)
if self.ext_out_incr_in:
# For a controller that uses an extended output representation and
# input increments, the optimal value computed in the current
# step corresponds to the input increment `du[k]`. This value is
# stored in `du` and is used in the next iteration, not the
# current one. This is because, in our formulation, input
# increments are defined as du[k] = u[k+1] - u[k].
# Calculate the optimal input step considering the control input
# u[k] = u[k-1] + du[k-1]
u_k1 = self.u[-1:]
du_k1 = self.du[-1:]
optimal_u_step_n = u_k1 + du_k1
else:
optimal_u_step_n = self._optimal_u[
n_step * self.m : (n_step + 1) * self.m
]
return optimal_u_step_n
def get_du_value_at_step(self, n_step: int = 0) -> np.ndarray | None:
"""
Get the optimal control input increment (`du`) from the MPC solution
corresponding to a specified time step in the prediction horizon
[0, L].
Args:
n_step (int): The time step of the optimal control input to
retrieve. It must be within the range [0, L].
Returns:
np.ndarray | None: An array containing the optimal control
input increment for the specified prediction time step if the
controller uses an extended output representation and input
increments. Otherwise, returns `None`.
Note:
This method assumes that the `optimal_du` attribute contains the
optimal control input increments from the MPC solution.
"""
if self.ext_out_incr_in:
return self._optimal_du[n_step * self.m : (n_step + 1) * self.m]
else:
return None
def store_input_output_measurement(
self,
u_current: np.ndarray,
y_current: np.ndarray,
du_current: np.ndarray | None = None,
) -> None:
"""
Store an input-output measurement pair for the current time step in
the input-output storage variables. If the controller uses an extended
output representation and input increments, the input increment
corresponding to the current input measurement is also stored.
This method updates the input-output storage variables `u`, `y` and
`du` by shifting the arrays and replacing the oldest measurements with
the current ones.
Args:
u_current (np.ndarray): The control input (u[k]) for the current
time step, expected to match the dimensions of prior inputs.
y_current (np.ndarray): The measured system output for the current
time step, expected to match the dimensions of prior outputs.
This output should correspond to the system's response to
`u_current`, as both represent a trajectory of the system.
du_current (np.ndarray | None): The control input increment
(du[k] = u[k+1] - u[k]) for the current time step, expected to
match the dimensions of prior inputs.
Raises:
ValueError: If `u_current`, `y_current`, or `du_current` do not
match the expected dimensions.
Note:
This method updates the `u`, `y`, and `du` arrays.
References:
[2]: See class-level docstring for full reference details.
"""
# Check measurement dimensions
expected_u0_dim = (self.m,)
expected_y0_dim = (self.p,)
if (
u_current.shape != expected_u0_dim
or y_current.shape != expected_y0_dim
):
raise ValueError(
"Incorrect dimensions. Expected dimensions are "
f"{expected_u0_dim} for u_current and {expected_y0_dim} for "
f"y_current, but got {u_current.shape} and {y_current.shape} "
"instead."
)
if self.ext_out_incr_in:
if du_current is None:
raise ValueError(
"A valid `du_current` value is required for a controller "
"that uses an extended output representation and input "
"increments."
)
if du_current.shape != expected_u0_dim:
raise ValueError(
"Incorrect dimensions for `du_current`. Expected "
f"dimensions are {expected_u0_dim}, but got "
f"{du_current.shape} instead."
)
# Shift and update control inputs
self.u[:-1] = self.u[1:]
self.u[-1:] = u_current
# Shift and update input increments if used
if self.ext_out_incr_in:
self.du[:-1] = self.du[1:]
self.du[-1:] = du_current
# Shift and update outputs
self.y[:-1] = self.y[1:]
if self.ext_out_incr_in:
# Update output considering the extended output
# representation (y_ext[k] = [y[k], u[k]])
self.y[-1:, : self.p] = y_current # Store system output
self.y[-1:, self.p :] = u_current # Store control input
else:
self.y[-1:] = y_current
def set_input_output_data(
self,
u: np.ndarray,
y: np.ndarray,
) -> None:
"""
Set the variables for the current input-output measurements.
This method assigns the provided input-output measurements to the
arrays storing the current input-output measurements, `u` and `y`.
Args:
u (np.ndarray): An array containing the current measured control
input. Expected to have a shape of (N, m), where 'N' is the
trajectory length and 'm' is the dimension of the input.
y (np.ndarray): An array containing the current measured system
output. Expected to have a shape of (N, p) where 'N' is the
trajectory length and 'p' is the dimension of the output.
Raises:
ValueError: If `u` or `y` do not have correct dimensions.
Note:
This method sets the values of the `u` and `y` attributes with the
provided new input-output data.
"""
# Validate input types and dimensions
expected_u_dim = (self.N, self.m)
expected_y_dim = (self.N, self.p)
if u.shape != expected_u_dim:
raise ValueError(
f"Incorrect dimensions. `u` must have shape {expected_u_dim}, "
f"but got {u.shape} instead."
)
if y.shape != expected_y_dim:
raise ValueError(
f"Incorrect dimensions. `y` must have shape {expected_y_dim}, "
f"but got {y.shape} instead."
)
# Update input-output trajectory data
self.u = u.copy() # Input trajectory data
if self.ext_out_incr_in:
# Incremental Input trajectory data
du_last = np.zeros((self.m))
self.du = np.vstack([u[1:, :] - u[:-1, :], [du_last]])
# Output trajectory data
if self.ext_out_incr_in:
# Extended Output trajectory data
self.y = np.hstack([y, u])
else:
self.y = y.copy()
def set_output_setpoint(self, y_r: np.ndarray) -> None:
"""
Set the system output setpoint of the Data-Driven MPC controller.
Args:
y_r (np.ndarray): The setpoint for system outputs.
Raises:
ValueError: If `y_r` does not have the expected dimensions.
Note:
This method sets the values of the `y_r` attribute with the
provided new setpoint and updates the value of `y_r_param`
to update the data-driven MPC controller setpoint.
"""
# Validate input types and dimensions
if y_r.shape != self.y_r.shape:
raise ValueError(
"Incorrect dimensions. `y_r` must have shape "
f"{self.y_r.shape}, but got {y_r.shape} instead."
)
# Update output setpoint and its parameter value
self.y_r = y_r
self._y_r_param.value = y_r
"""
Internal methods
"""
def _evaluate_input_persistent_excitation(self) -> None:
"""
Evaluate whether the input data is persistently exciting of order
(L + n + 1).
This method first verifies that the length of the elements in the
input data matches the number of inputs of the system. Then, it
evaluates the rank of the Hankel matrix induced by the input sequence
to determine if the input sequence is persistently exciting of order
(L + n + 1), as described in Definition 1 [2].
Raises:
ValueError: If the length of the elements in the data sequence
does not match the number of inputs of the system, or if the
input data is not persistently exciting of order (L + n + 1).
References:
[2]: See class-level docstring for full reference details.
"""
# Get the length of the elements of the data sequence
u_n = self.u.shape[1] # m - Number of inputs
# Check if the number of inputs matches the expected
# number of inputs of the system
if u_n != self.m:
raise ValueError(
f"The length of the elements of the data sequence ({u_n}) "
f"should match the number of inputs of the system ({self.m})."
)
# Evaluate if input data is persistently exciting of order (L + n + 1)
# based on the rank of its induced Hankel matrix
expected_order = self.L + self.n + 1
in_hankel_rank, in_pers_exc = evaluate_persistent_excitation(
X=self.u, order=expected_order
)
if not in_pers_exc:
raise ValueError(
"Initial input trajectory data is not persistently exciting "
"of order (L + n + 1). The rank of its induced Hankel matrix "
f"({in_hankel_rank}) does not match the expected rank ("
f"{u_n * expected_order})."
)
def _check_prediction_horizon_length(self) -> None:
"""
Check if the prediction horizon length, `L`, satisfies the MPC system
design, as described in [2].
Raises:
ValueError: If the prediction horizon `L` is less than the
required threshold based on the controller type.
References:
[2]: See class-level docstring for full reference details.
"""
if self.L < self.n:
raise ValueError(
"The prediction horizon (`L`) must be greater than or equal "
"to the estimated system order `n`."
)
def _check_weighting_matrices_dimensions(self) -> None:
"""
Check if the dimensions of the output (`Q`), input (`R`), and setpoint
(`S`) weighting matrices are correct for an MPC formulation based on
their order.
Raises:
ValueError: If the dimensions of the `Q`, `R`, or `S` matrices are
incorrect.
"""
if self.ext_out_incr_in:
expected_output_shape = (
(self.m + self.p) * (self.L + self.n + 1),
(self.m + self.p) * (self.L + self.n + 1),
)
else:
expected_output_shape = (
self.p * (self.L + self.n + 1),
self.p * (self.L + self.n + 1),
)
expected_input_shape = (
self.m * (self.L + self.n + 1),
self.m * (self.L + self.n + 1),
)
expected_setpoint_shape = (self.p, self.p)
if self.Q.shape != expected_output_shape:
if self.ext_out_incr_in:
raise ValueError(
"Output weighting square matrix Q should be of order "
"((m + p) * (L + n + 1))."
)
else:
raise ValueError(
"Output weighting square matrix Q should be of order "
"(p * (L + n + 1))."
)
if self.R.shape != expected_input_shape:
raise ValueError(
"Input weighting square matrix R should be of order "
"(m * (L + n + 1))."
)
if self.S.shape != expected_setpoint_shape:
raise ValueError(
"Output setpoint weighting square matrix S should be of "
"order (p)."
)
def _initialize_data_driven_mpc(self) -> None:
"""
Initialize the Data-Driven MPC controller.
This method performs the following tasks:
1. Constructs Hankel matrices from the initial input-output trajectory
data (`u`, `y`). These matrices are used for the data-driven
characterization of the unknown system, as defined by the system
dynamic constraints in the Nonlinear Data-Driven MPC formulation
of [2].
2. Defines the optimization variables for the Data-Driven MPC problem.
3. Defines the constraints for the MPC problem, which include the
system dynamics, internal state, terminal state, and input
constraints.
4. Defines the cost function for the MPC problem.
5. Formulates the MPC problem as a Quadratic Programming (QP) problem.
6. Solves the initialized MPC problem to ensure the formulation is
valid and retrieve the optimal control input for the initial
setup.
This initialization process ensures that all necessary components for
the Data-Driven MPC are correctly defined and that the MPC problem is
solvable with the provided initial data.
References:
[2]: See class-level docstring for full reference details.
"""
# Construct Hankel Matrices from the initial input-output trajectory
# for the data-driven characterization of the unknown system.
# Used for the system dynamic constraints defined by Equation (22b).
if self.ext_out_incr_in:
# For a controller that uses an extended output representation
# and input increments, the input Hankel matrix corresponds to
# input increments instead of absolute inputs.
# H_{L+n+1}(du)
self._HLn1_u = hankel_matrix(self.du, self.L + self.n + 1)
else:
# H_{L+n+1}(u)
self._HLn1_u = hankel_matrix(self.u, self.L + self.n + 1)
# H_{L+n+1}(y)
self._HLn1_y = hankel_matrix(self.y, self.L + self.n + 1)
# Define the Data-Driven MPC problem
self._define_optimization_variables()
self._define_optimization_parameters()
if self.alpha_reg_type == AlphaRegType.APPROXIMATED:
# Define the QP problem for computing an approximation of
# `alpha_Lin^sr(D_t)` if `alpha` is regularized with respect
# to this approximation
self._define_alpha_sr_Lin_Dt_prob()
self._update_optimization_parameters()
self._define_mpc_constraints()
self._define_cost_function()
self._define_mpc_problem()
# Validate the Data-Driven MPC formulation with an initial solution
self._solve_mpc_problem()
self._get_optimal_control_input()
def _define_optimization_variables(self) -> None:
"""
Define the optimization variables for the Data-Driven MPC controller.
This method defines data-driven MPC optimization variables as
described in the Nonlinear Data-Driven MPC formulation in [2].
Note:
This method initializes attributes that define the MPC
optimization variables for the controller. Additional attributes
are initialized if the MPC controller uses an extended output
representation and input increments or if the `alpha`
regularization type is `APPROXIMATED`.
References:
[2]: See class-level docstring for full reference details.
"""
# alpha(t)
self._alpha = cp.Variable((self.N - self.L - self.n, 1))
# sigma(t)
if self.ext_out_incr_in:
self._sigma = cp.Variable(
((self.L + self.n + 1) * (self.m + self.p), 1)
)
else:
self._sigma = cp.Variable(((self.L + self.n + 1) * self.p, 1))
# ubar[-n, L](t)
self._ubar = cp.Variable(((self.L + self.n + 1) * self.m, 1))
# For a controller that uses an extended output representation and
# input increments, `ubar` corresponds to input increments (du[k])
# and not absolute control inputs (u[k]).
# ybar[-n, L](t)
if self.ext_out_incr_in:
self._ybar = cp.Variable(
((self.L + self.n + 1) * (self.m + self.p), 1)
)
else:
self._ybar = cp.Variable(((self.L + self.n + 1) * self.p, 1))
# u_s(t)
self._u_s = cp.Variable((self.m, 1))
# y_s(t)
if self.ext_out_incr_in:
self._y_s = cp.Variable((self.m + self.p, 1))
else:
self._y_s = cp.Variable((self.p, 1))
# Define variable segments for constraint definitions
# Internal state constraints: Get initial `n` input-output predictions
# ubar[-n, -1]
self._ubar_state = self._ubar[: self.n * self.m]
# ybar[-n, -1]
if self.ext_out_incr_in:
self._ybar_state = self._ybar[: self.n * (self.m + self.p)]
else:
self._ybar_state = self._ybar[: self.n * self.p]
# Terminal state constraints:
# Get terminal segments of input-output predictions
# ubar[L-n, L]
self._ubar_terminal = self._ubar[self.L * self.m :]
# ybar[L-n, L]
if self.ext_out_incr_in:
self._ybar_terminal = self._ybar[
-(self.n + 1) * (self.m + self.p) :
]
else:
self._ybar_terminal = self._ybar[self.L * self.p :]
# Input constraints:
# Get absolute input variable indices (u[k])
self._ubar_pred: cp.Expression
if self.ext_out_incr_in:
# Get `ubar` predicted input considering the extended output
# representation: ybar_ext = [ybar, ubar]
start_indices = np.arange(
self.n * (self.m + self.p) + self.p,
self._ybar.shape[0],
self.p + self.m,
)
slices = [
self._ybar[start : start + self.m] for start in start_indices
]
self._ubar_pred = cp.vstack(slices) # ubar[0,L] excluding output
else:
self._ubar_pred = self._ubar[self.n * self.m :] # ubar[0,L]
# sigma_ubar: The sigma values corresponding to the input when using a
# controller with an extended output representation (ybar_ext =
# [ybar, ubar]). Used to constrain the input sigma values to 0, since
# the sigma values should only affect the output and not the input.
if self.ext_out_incr_in:
start_indices = np.arange(
self.p, self._sigma.shape[0], self.p + self.m
)
slices = [
self._sigma[start : start + self.m] for start in start_indices
]
self._sigma_ubar = cp.vstack(slices)
# Cost function related variables:
# Define predicted input-output setpoint variable arrays
# by repetition for cost function
self._u_s_tiled = cp.vstack(
[self._u_s for _ in range(self.L + self.n + 1)]
)
self._y_s_tiled = cp.vstack(
[self._y_s for _ in range(self.L + self.n + 1)]
)
# Define optimization variables for computing the approximation of
# alpha_Lin^sr(D_t) by solving Equation (23) of [2] if the alpha
# regularization type is APPROXIMATED (regularization w.r.t. an
# approximated alpha_Lin^sr(D_t) value).
if self.alpha_reg_type == AlphaRegType.APPROXIMATED:
# alpha_s
self._alpha_s = cp.Variable((self.N - self.L - self.n, 1))
# sigma_s
if self.ext_out_incr_in:
self._sigma_s = cp.Variable(
((self.L + self.n + 1) * (self.m + self.p), 1)
)
else:
self._sigma_s = cp.Variable(
((self.L + self.n + 1) * self.p, 1)
)
# sigma_s_ubar: The sigma_s values corresponding to the input.
# Refer to sigma_ubar definition.
if self.ext_out_incr_in:
start_indices = np.arange(
self.p, self._sigma_s.shape[0], self.p + self.m
)
slices = [
self._sigma_s[start : start + self.m]
for start in start_indices
]
self._sigma_s_ubar = cp.vstack(slices)
def _define_optimization_parameters(self) -> None:
"""
Define MPC optimization parameters that are updated at every step
iteration.