Source code for picos.solvers.solver_osqp

# ------------------------------------------------------------------------------
# Copyright (C) 2021-2022 Maximilian Stahlberg
#
# This file is part of PICOS.
#
# PICOS is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# PICOS is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------

"""Implementation of :class:`OSQPSolver`."""

import cvxopt
import numpy

from ..apidoc import api_end, api_start
from ..constraints import AffineConstraint, DummyConstraint
from ..expressions import (CONTINUOUS_VARTYPES, AffineExpression,
                           QuadraticExpression)
from ..expressions.data import cvx2csc
from ..modeling.footprint import Specification
from ..modeling.solution import (PS_FEASIBLE, PS_INFEASIBLE, PS_UNBOUNDED,
                                 PS_UNKNOWN, SS_FAILURE, SS_INFEASIBLE,
                                 SS_OPTIMAL, SS_PREMATURE, SS_UNKNOWN)
from .solver import Solver

_API_START = api_start(globals())
# -------------------------------


[docs]class OSQPSolver(Solver): """Interface to the OSQP solver.""" SUPPORTED = Specification( objectives=[ AffineExpression, QuadraticExpression], variables=CONTINUOUS_VARTYPES, constraints=[ DummyConstraint, AffineConstraint])
[docs] @classmethod def supports(cls, footprint, explain=False): """Implement :meth:`~.solver.Solver.supports`.""" result = Solver.supports(footprint, explain) if not result or (explain and not result[0]): return result if footprint.nonconvex_quadratic_objective: if explain: return (False, "QPs with a nonconvex objective.") else: return False if footprint not in cls.SUPPORTED: if explain: return False, cls.SUPPORTED.mismatch_reason(footprint) else: return False return (True, None) if explain else True
[docs] @classmethod def default_penalty(cls): """Implement :meth:`~.solver.Solver.default_penalty`.""" # OSQP is an established free/open source solver but it has issues with # moderate numbers of affine inequalities, so leave it to user choice. return 2.0
[docs] @classmethod def test_availability(cls): """Implement :meth:`~.solver.Solver.test_availability`.""" cls.check_import("osqp")
[docs] @classmethod def names(cls): """Implement :meth:`~.solver.Solver.names`.""" return "osqp", "OSQP", "Operator Splitting QP Solver", None
[docs] @classmethod def is_free(cls): """Implement :meth:`~.solver.Solver.is_free`.""" return True
[docs] def __init__(self, problem): """Initialize a OSQP solver interface. :param ~picos.Problem problem: The problem to be solved. """ super(OSQPSolver, self).__init__(problem) self._numVars = 0 """Total number of scalar variables passed to OSQP.""" self._osqpVarOffset = {} """Maps a PICOS variable to its column in the constraint matrix.""" self._osqpConOffset = {} """Maps a PICOS constraint to its row in the constraint matrix.""" self._objectiveOffset = 0.0 """Objective function constant offset."""
[docs] def reset_problem(self): """Implement :meth:`~.solver.Solver.reset_problem`.""" self.int = None self._numVars = 0 self._osqpVarOffset.clear() self._osqpConOffset.clear() self._objectiveOffset = 0.0
def _affine_expression_to_G_and_h(self, expression): assert isinstance(expression, AffineExpression) return expression.scipy_sparse_matrix_form( varOffsetMap=self._osqpVarOffset, dense_b=True) _Gh = _affine_expression_to_G_and_h def _import_variables(self): offset = 0 for variable in self.ext.variables.values(): dim = variable.dim # Register the variable. self._osqpVarOffset[variable] = offset offset += dim assert offset == self._numVars # Add variable bounds as affine constraints. for variable in self.ext.variables.values(): # TODO: Import lower and upper bound in a single constraint instead. bounds = variable.bound_constraint if bounds: self._import_affine_constraint(bounds) def _import_objective(self): direction, objective = self.ext.no # OSQP only supports minimization; flip the sign for maximization. if direction == "max": objective = -objective # Split objective into quadratic and affine part. if isinstance(objective, AffineExpression): sparse_quads = {} affine_part = objective elif isinstance(objective, QuadraticExpression): sparse_quads = objective._sparse_quads affine_part = objective.aff else: assert False, "Unexpected objective." # Import quadratic part. for xy, Q in sparse_quads.items(): x, y = xy m, n = x.dim, y.dim dx = self._osqpVarOffset[x] dy = self._osqpVarOffset[y] # OSQP reads only the upper triangular part. if dx > dy: dx, dy = dy, dx m, n = n, m Q = Q.T # OSQP adds a factor of 0.5; cancel it. Q = 2*Q # Convert from cvxopt sparse to scipy sparse matrix. Q = cvx2csc(Q) self.int["P"][dx:dx + m, dy:dy + n] = Q # Import linear part. q, c = self._Gh(affine_part) self.int["q"] = numpy.ravel(q.todense()) self._objectiveOffset = float(c[0]) def _import_affine_constraint(self, constraint): from scipy import sparse assert isinstance(constraint, AffineConstraint) A, minus_b = self._Gh(constraint.lmr) b = -minus_b if constraint.is_equality(): l = u = b elif constraint.is_increasing(): l = numpy.full(len(b), float("-inf")) u = b elif constraint.is_decreasing(): l = b u = numpy.full(len(b), float("+inf")) self._osqpConOffset[constraint] = len(self.int["l"]) # TODO: Add matrices to a list and concatenate them at once later. self.int["A"] = sparse.vstack([self.int["A"], A], format="csc") self.int["l"] = numpy.concatenate([self.int["l"], l]) self.int["u"] = numpy.concatenate([self.int["u"], u]) def _import_constraint(self, constraint): if isinstance(constraint, AffineConstraint): self._import_affine_constraint(constraint) else: assert isinstance(constraint, DummyConstraint), \ "Unexpected constraint type: {}".format( constraint.__class__.__name__) def _import_problem(self): from scipy import sparse self._numVars = n = sum(var.dim for var in self.ext.variables.values()) # OSQP's internal problem representation is stateful but supports # updates only by setting or replacing whole matrices and not on a # per-variable or per-constraint basis. We thus pretend it was stateless # and use the osqp.solve function in a similar manner as with CVXOPT. # TODO: Consider supporting the limited update capabilities of OSQP. self.int = { # Objective function quadratic form. # NOTE: lil_matrix for cheap updates, converted to csc_matrix later. "P": sparse.lil_matrix((n, n)), # Objective function linear coefficients. "q": numpy.zeros(n), # Linear inequality coefficient matrix. "A": sparse.csc_matrix((0, n)), # Linear inequality lower bound. "l": numpy.zeros(0), # Linear inequality upper bound. "u": numpy.zeros(0), } # Import variables without their bounds. self._import_variables() # Set objective. self._import_objective() # Import constraints. for constraint in self.ext.constraints.values(): self._import_constraint(constraint) # Convert from LIL to CSC manually to avoid a warning by OSQP. self.int["P"] = self.int["P"].tocsc() def _update_problem(self): raise NotImplementedError def _solve(self): import osqp options = {} # verbosity options["verbose"] = (self.verbosity() >= 1) # abs_prim_fsb_tol if self.ext.options.abs_prim_fsb_tol is not None: options["eps_prim_inf"] = self.ext.options.abs_prim_fsb_tol # abs_dual_fsb_tol if self.ext.options.abs_dual_fsb_tol is not None: options["eps_dual_inf"] = self.ext.options.abs_dual_fsb_tol # abs_ipm_opt_tol if self.ext.options.abs_ipm_opt_tol is not None: options["eps_abs"] = self.ext.options.abs_ipm_opt_tol # rel_ipm_opt_tol if self.ext.options.rel_ipm_opt_tol is not None: options["eps_rel"] = self.ext.options.rel_ipm_opt_tol # max_iterations # NOTE: OSQP can hang long already for moderately sized LPs but we still # "remove" the iteration limit to obey the PICOS setting. This is # fine as long as OSQP requires user selection. if self.ext.options.max_iterations is not None: options["max_iter"] = self.ext.options.max_iterations else: options["max_iter"] = int(1e9) # timelimit if self.ext.options.timelimit is not None: options["time_limit"] = self.ext.options.timelimit # Enable polishing to increase chance of obeying precision limits. options["polish"] = True # Handle OSQP-specific options. options.update(self.ext.options.osqp_params) # Handle unsupported options. # TODO: Support hotstart. self._handle_unsupported_options("lp_root_method", "lp_node_method", "treememory", "max_fsb_nodes", "hotstart") # Attempt to solve the problem. with self._header(), self._stopwatch(): # NOTE: There is supposed to be a direct function osqp.solve but it # does not exist for my installation of 0.6.2. # result = osqp.solve(**self.int, **options) model = osqp.OSQP() model.setup(**self.int, **options) try: result = model.solve() except ValueError as error: if str(error) == "OSQP solve error!": result = None else: raise # Retrieve primals. primals = {} if result and self.ext.options.primals is not False: for variable in self.ext.variables.values(): offset = self._osqpVarOffset[variable] primal = list(result.x[offset:offset + variable.dim]) if None in primal: primal = None else: primal = cvxopt.matrix(primal) primals[variable] = primal # Retrieve duals. duals = {} if result and self.ext.options.duals is not False: for constraint in self.ext.constraints.values(): if isinstance(constraint, DummyConstraint): duals[constraint] = cvxopt.spmatrix( [], [], [], constraint.size) continue assert isinstance(constraint, AffineConstraint) offset = self._osqpConOffset[constraint] length = len(constraint) dual = list(result.y[offset:offset + length]) if None in dual: dual = None else: dual = cvxopt.matrix(dual) if not constraint.is_increasing(): dual = -dual duals[constraint] = dual # Retrieve objective value. value = result.info.obj_val if result else None if value is not None: # Add back the constant part. value += self._objectiveOffset # Flip back the sign for maximization. if self.ext.no.direction == "max": value = -value # Retrieve solution status. status = result.info.status if result else None if status is None: primalStatus = SS_FAILURE dualStatus = SS_FAILURE problemStatus = PS_UNKNOWN elif status == "solved": primalStatus = SS_OPTIMAL dualStatus = SS_OPTIMAL problemStatus = PS_FEASIBLE elif status == "primal infeasible": primalStatus = SS_INFEASIBLE dualStatus = SS_UNKNOWN problemStatus = PS_INFEASIBLE elif status == "dual infeasible": primalStatus = SS_UNKNOWN dualStatus = SS_INFEASIBLE problemStatus = PS_UNBOUNDED elif status in ("maximum iterations reached", "run time limit reached"): primalStatus = SS_PREMATURE dualStatus = SS_PREMATURE problemStatus = PS_UNKNOWN else: assert False, "Unknown solver status '{}'".format(status) return self._make_solution( value, primals, duals, primalStatus, dualStatus, problemStatus, {"osqp_info": result.info if result else None})
# -------------------------------------- __all__ = api_end(_API_START, globals())