Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# ------------------------------------------------------------------------------ 

2# Copyright (C) 2020 Maximilian Stahlberg 

3# 

4# This file is part of PICOS. 

5# 

6# PICOS is free software: you can redistribute it and/or modify it under the 

7# terms of the GNU General Public License as published by the Free Software 

8# Foundation, either version 3 of the License, or (at your option) any later 

9# version. 

10# 

11# PICOS is distributed in the hope that it will be useful, but WITHOUT ANY 

12# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

13# A PARTICULAR PURPOSE. See the GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License along with 

16# this program. If not, see <http://www.gnu.org/licenses/>. 

17# ------------------------------------------------------------------------------ 

18 

19"""Implements :class:`WassersteinAmbiguousExtremumAffineConstraint`.""" 

20 

21from collections import namedtuple 

22 

23from ... import glyphs 

24from ...apidoc import api_end, api_start 

25from ...caching import cached_property 

26from ..constraint import Constraint, ConstraintConversion 

27 

28_API_START = api_start(globals()) 

29# ------------------------------- 

30 

31 

32class WassersteinAmbiguousExtremumAffineConstraint(Constraint): 

33 """A bound on a W_1-ambiguous expected value of a piecewise function.""" 

34 

35 class DistributionallyRobustConversion(ConstraintConversion): 

36 """Distributionally robust counterpart conversion.""" 

37 

38 @classmethod 

39 def predict(cls, subtype, options): 

40 """Implement :meth:`~.constraint.ConstraintConversion.predict`.""" 

41 from ...expressions import RealVariable 

42 from .. import AffineConstraint, SOCConstraint 

43 

44 k = subtype.extremum_argnum 

45 m = subtype.universe_subtype.sample_dim 

46 N = subtype.universe_subtype.sample_num 

47 

48 yield ("var", RealVariable.make_var_type(dim=1, bnd=0), 1) # gamma 

49 yield ("var", RealVariable.make_var_type(dim=N, bnd=0), 1) # s 

50 

51 yield ("con", AffineConstraint.make_type(dim=1, eq=False), 1) 

52 yield ("con", AffineConstraint.make_type(dim=k, eq=False), N) 

53 yield ("con", SOCConstraint.make_type(argdim=m), k) 

54 

55 @classmethod 

56 def convert(cls, con, options): 

57 """Implement :meth:`~.constraint.ConstraintConversion.convert`.""" 

58 # The conversion recipe is found in "Robust conic optimization in 

59 # Python" (Stahlberg 2020) and is an application of a result in 

60 # "Data-driven distributionally robust optimization using the 

61 # Wasserstein metric: Performance guarantees and tractable 

62 # reformulations" (Esfahani Mohajerin and Kuhn 2018). 

63 from ...expressions import ( 

64 Constant, RandomMaximumAffine, RealVariable, SecondOrderCone) 

65 from ...expressions.algebra import block 

66 from ...modeling import Problem 

67 

68 # Load the constraint in maximum form. 

69 con = con.maximum_form 

70 assert isinstance(con.extremum, RandomMaximumAffine) 

71 assert con.relation == con.LE 

72 k = con.extremum.argnum 

73 K = range(k) 

74 

75 # Load the ambiguity set. 

76 WAS = con.extremum.universe 

77 xi = WAS.parameter 

78 S = WAS.samples 

79 m = S.dim 

80 N = S.num 

81 w = WAS.weights 

82 eps = WAS.eps 

83 

84 # Load the uncertain extremum as max(h[i].T*xi + eta[i] for i in K). 

85 zero = Constant(0, shape=(1, m)) 

86 hT, eta = zip(*( 

87 (zero, a) if a.certain else a.factor_out(xi) 

88 for a in con.extremum.expressions)) 

89 

90 # Stack the h[i].T and eta[i] vertically and to allow N many 

91 # k-dimensional constraints as opposed to N*k scalar constraints. 

92 # TODO: Consider adding picos.algebra.hcat and picos.algebra.vcat. 

93 hTs = block([[hT[i]] for i in K]) 

94 etas = block([[eta[i]] for i in K]) 

95 

96 # Load the upper bound. 

97 omega = con.rhs 

98 

99 P = Problem() 

100 

101 gamma = RealVariable("__gamma") 

102 s = RealVariable("__s", N) 

103 

104 P.add_constraint(gamma*eps + w.T*s <= omega) 

105 

106 for j in range(N): 

107 P.add_constraint(hTs*S[j] + etas <= s[j].dupvec(k)) 

108 

109 for i in K: 

110 P.add_constraint((gamma & hT[i]) << SecondOrderCone()) 

111 

112 return P 

113 

114 def __init__(self, extremum, relation, rhs): 

115 """Construct a :class:`WassersteinAmbiguousExtremumAffineConstraint`. 

116 

117 :param ~picos.expressions.RandomExtremumAffine extremum: 

118 Left hand side expression. 

119 :param str relation: 

120 Constraint relation symbol. 

121 :param ~picos.expressions.AffineExpression rhs: 

122 Right hand side expression. 

123 """ 

124 from ...expressions import ( 

125 AffineExpression, RandomMaximumAffine, RandomMinimumAffine) 

126 

127 if relation == self.LE: 

128 assert isinstance(extremum, RandomMaximumAffine) 

129 else: 

130 assert isinstance(extremum, RandomMinimumAffine) 

131 assert isinstance(rhs, AffineExpression) 

132 assert rhs.scalar 

133 

134 self.extremum = extremum 

135 self.relation = relation 

136 self.rhs = rhs 

137 

138 super(WassersteinAmbiguousExtremumAffineConstraint, self).__init__( 

139 "Wasserstein-ambiguous Piecewise Linear Expectation", 

140 printSize=True) 

141 

142 @cached_property 

143 def maximum_form(self): 

144 """The constraint posed as an upper bound on an expected maximum.""" 

145 if self.relation == self.LE: 

146 return self 

147 else: 

148 return self.__class__(-self.extremum, self.LE, -self.rhs) 

149 

150 Subtype = namedtuple("Subtype", ( 

151 "extremum_argnum", 

152 "universe_subtype")) 

153 

154 def _subtype(self): 

155 return self.Subtype( 

156 self.extremum.argnum, 

157 self.extremum.universe.subtype) 

158 

159 @classmethod 

160 def _cost(cls, subtype): 

161 return float("inf") 

162 

163 def _expression_names(self): 

164 yield "extremum" 

165 yield "rhs" 

166 

167 def _str(self): 

168 if self.relation == self.LE: 

169 return glyphs.le( 

170 self.extremum.worst_case_string("max"), self.rhs.string) 

171 else: 

172 return glyphs.ge( 

173 self.extremum.worst_case_string("min"), self.rhs.string) 

174 

175 def _get_size(self): 

176 return (1, 1) 

177 

178 def _get_slack(self): 

179 if self.relation == self.LE: 

180 return self.rhs.safe_value - self.extremum.worst_case_value("max") 

181 else: 

182 return self.extremum.worst_case_value("min") - self.rhs.safe_value 

183 

184 

185# -------------------------------------- 

186__all__ = api_end(_API_START, globals())