Source code for picos.modeling.file_in

# ------------------------------------------------------------------------------
# Copyright (C) 2012-2017 Guillaume Sagnol
# Copyright (C)      2019 Maximilian Stahlberg
#
# This file is part of PICOS.
#
# PICOS is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# PICOS is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------

"""Functions for reading optimization problems from a file."""

# ------------------------------------------------------------------------------
# TODO: This file is using ad-hoc parser implementations. Use a proper parser
#       generation library and a unified 'read' function for different formats.
# ------------------------------------------------------------------------------

import warnings

import cvxopt
import numpy

from ..apidoc import api_end, api_start
from ..expressions import AffineExpression, Constant
from .problem import Problem

_API_START = api_start(globals())
# -------------------------------


def _spmatrix(*args, **kwargs):
    """Create a CVXOPT sparse matrix.

    A wrapper around :func:`cvxopt.spmatrix` that converts indices to
    :class:`int`, if necessary.

    Works around PICOS sometimes passing indices as :class:`numpy:numpy.int64`.
    """
    try:
        return cvxopt.spmatrix(*args, **kwargs)
    except TypeError as error:
        # CVXOPT does not like NumPy's int64 scalar type for indices, so attempt
        # to convert all indices to Python's int.
        if str(error) == "non-numeric type in list":
            newargs = list(args)

            for argNum, arg in enumerate(args):
                if argNum in (1, 2):  # Positional I, J.
                    newargs[argNum] = [int(x) for x in args[argNum]]

            for kw in "IJ":
                if kw in kwargs:
                    kwargs[kw] = [int(x) for x in kwargs[kw]]

            return cvxopt.spmatrix(*newargs, **kwargs)
        else:
            raise


def _break_cols(mat, sizes):
    """Help with the reading of CBF files."""
    n = len(sizes)
    I, J, V = [], [], []
    for i in range(n):
        I.append([])
        J.append([])
        V.append([])
    cumsz = numpy.cumsum(sizes)
    import bisect

    for i, j, v in zip(mat.I, mat.J, mat.V):
        block = bisect.bisect(cumsz, j)
        I[block].append(i)
        V[block].append(v)
        if block == 0:
            J[block].append(j)
        else:
            J[block].append(j - cumsz[block - 1])
    return [
        _spmatrix(V[k], I[k], J[k], (mat.size[0], sz))
        for k, sz in enumerate(sizes)
    ]


def _break_rows(mat, sizes):
    """Help with the reading of CBF files."""
    n = len(sizes)
    I, J, V = [], [], []
    for i in range(n):
        I.append([])
        J.append([])
        V.append([])
    cumsz = numpy.cumsum(sizes)
    import bisect

    for i, j, v in zip(mat.I, mat.J, mat.V):
        block = bisect.bisect(cumsz, i)
        J[block].append(j)
        V[block].append(v)
        if block == 0:
            I[block].append(i)
        else:
            I[block].append(i - cumsz[block - 1])
    return [
        _spmatrix(V[k], I[k], J[k], (sz, mat.size[1]))
        for k, sz in enumerate(sizes)
    ]


def _block_idx(i, sizes):
    """Help with the reading of CBF files."""
    # if there are blocks of sizes n1,...,nk and i is
    # the index of an element of the big vectorized variable,
    # returns the block of i and its index inside the sub-block.
    cumsz = numpy.cumsum(sizes)
    import bisect

    block = bisect.bisect(cumsz, i)
    return block, (i if block == 0 else i - cumsz[block - 1])


def _read_cbf_block(P, blocname, f, parsed_blocks):
    """Help with the reading of CBF files."""
    if blocname == "OBJSENSE":
        objsense = f.readline().split()[0].lower()
        P.set_objective(objsense, P.objective.normalized.function)
        return None
    elif blocname == "PSDVAR":
        n = int(f.readline())
        vardims = []
        XX = []
        for i in range(n):
            ni = int(f.readline())
            vardims.append(ni)
            Xi = P.add_variable("X[" + str(i) + "]", (ni, ni), "symmetric")
            XX.append(Xi)
            P.add_constraint(Xi >> 0)
        return vardims, XX
    elif blocname == "VAR":
        Nscalar, ncones = [int(fi) for fi in f.readline().split()]
        tot_dim = 0
        var_structure = []
        xx = []
        for i in range(ncones):
            lsplit = f.readline().split()
            tp, dim = lsplit[0], int(lsplit[1])
            tot_dim += dim
            var_structure.append(dim)
            if tp == "F":
                xi = P.add_variable("x[" + str(i) + "]", dim)
            elif tp == "L+":
                xi = P.add_variable("x[" + str(i) + "]", dim, lower=0)
            elif tp == "L-":
                xi = P.add_variable("x[" + str(i) + "]", dim, upper=0)
            elif tp == "L=":
                xi = P.add_variable("x[" + str(i) + "]", dim, lower=0, upper=0)
            elif tp == "Q":
                xi = P.add_variable("x[" + str(i) + "]", dim)
                P.add_constraint(abs(xi[1:]) < xi[0])
            elif tp == "QR":
                xi = P.add_variable("x[" + str(i) + "]", dim)
                P.add_constraint(abs(xi[2:]) ** 2 < 2 * xi[0] * xi[1])
            xx.append(xi)
        if tot_dim != Nscalar:
            raise Exception("VAR dimensions do not match the header")
        return Nscalar, var_structure, xx
    elif blocname == "INT":
        n = int(f.readline())
        ints = {}
        for k in range(n):
            j = int(f.readline())
            i, col = _block_idx(j, parsed_blocks["VAR"][1])
            ints.setdefault(i, [])
            ints[i].append(col)
        x = parsed_blocks["VAR"][2]
        for i in ints:
            if len(ints[i]) == x[i].size[0]:
                x[i].vtype = "integer"
            else:
                x.append(
                    P.add_variable(
                        "x_int[" + str(i) + "]", len(ints[i]), "integer"
                    )
                )
                for k, j in enumerate(ints[i]):
                    P.add_constraint(x[i][j] == x[-1][k])
        return x
    elif blocname == "CON":
        Ncons, ncones = [int(fi) for fi in f.readline().split()]
        cons_structure = []
        tot_dim = 0
        for i in range(ncones):
            lsplit = f.readline().split()
            tp, dim = lsplit[0], int(lsplit[1])
            tot_dim += dim
            cons_structure.append((tp, dim))
        if tot_dim != Ncons:
            raise Exception("CON dimensions do not match the header")
        return Ncons, cons_structure
    elif blocname == "PSDCON":
        n = int(f.readline())
        psdcons_structure = []
        for i in range(n):
            ni = int(f.readline())
            psdcons_structure.append(ni)
        return psdcons_structure
    elif blocname == "OBJACOORD":
        n = int(f.readline())
        J = []
        V = []
        for i in range(n):
            lsplit = f.readline().split()
            j, v = int(lsplit[0]), float(lsplit[1])
            J.append(j)
            V.append(v)
        return _spmatrix(V, [0] * len(J), J, (1, parsed_blocks["VAR"][0]))
    elif blocname == "OBJBCOORD":
        return float(f.readline())
    elif blocname == "OBJFCOORD":
        n = int(f.readline())
        Fobj = [
            _spmatrix([], [], [], (ni, ni)) for ni in parsed_blocks["PSDVAR"][0]
        ]
        for k in range(n):
            lsplit = f.readline().split()
            j, row, col, v = (
                int(lsplit[0]),
                int(lsplit[1]),
                int(lsplit[2]),
                float(lsplit[3]),
            )
            Fobj[j][row, col] = v
            if row != col:
                Fobj[j][col, row] = v
        return Fobj
    elif blocname == "FCOORD":
        n = int(f.readline())
        Fblocks = {}
        for k in range(n):
            lsplit = f.readline().split()
            i, j, row, col, v = (
                int(lsplit[0]),
                int(lsplit[1]),
                int(lsplit[2]),
                int(lsplit[3]),
                float(lsplit[4]),
            )
            if i not in Fblocks:
                Fblocks[i] = [
                    _spmatrix([], [], [], (ni, ni))
                    for ni in parsed_blocks["PSDVAR"][0]
                ]
            Fblocks[i][j][row, col] = v
            if row != col:
                Fblocks[i][j][col, row] = v
        return Fblocks
    elif blocname == "ACOORD":
        n = int(f.readline())
        J = []
        V = []
        I = []
        for k in range(n):
            lsplit = f.readline().split()
            i, j, v = int(lsplit[0]), int(lsplit[1]), float(lsplit[2])
            I.append(i)
            J.append(j)
            V.append(v)
        return _spmatrix(
            V, I, J, (parsed_blocks["CON"][0], parsed_blocks["VAR"][0])
        )
    elif blocname == "BCOORD":
        n = int(f.readline())
        V = []
        I = []
        for k in range(n):
            lsplit = f.readline().split()
            i, v = int(lsplit[0]), float(lsplit[1])
            I.append(i)
            V.append(v)
        return _spmatrix(V, I, [0] * len(I), (parsed_blocks["CON"][0], 1))
    elif blocname == "HCOORD":
        n = int(f.readline())
        Hblocks = {}
        for k in range(n):
            lsplit = f.readline().split()
            i, j, row, col, v = (
                int(lsplit[0]),
                int(lsplit[1]),
                int(lsplit[2]),
                int(lsplit[3]),
                float(lsplit[4]),
            )
            if j not in Hblocks:
                Hblocks[j] = [
                    _spmatrix([], [], [], (ni, ni))
                    for ni in parsed_blocks["PSDCON"]
                ]
            Hblocks[j][i][row, col] = v
            if row != col:
                Hblocks[j][i][col, row] = v
        return Hblocks
    elif blocname == "DCOORD":
        n = int(f.readline())
        Dblocks = [
            _spmatrix([], [], [], (ni, ni)) for ni in parsed_blocks["PSDCON"]
        ]
        for k in range(n):
            lsplit = f.readline().split()
            i, row, col, v = (
                int(lsplit[0]),
                int(lsplit[1]),
                int(lsplit[2]),
                float(lsplit[3]),
            )
            Dblocks[i][row, col] = v
            if row != col:
                Dblocks[i][col, row] = v
        return Dblocks
    else:
        raise Exception("unexpected block name")


[docs]def import_cbf(filename): """Create a :class:`~picos.Problem` from a CBF file. The created problem contains one (multidimensional) variable for each cone specified in the section ``VAR`` of the .cbf file, and one (multidimmensional) constraint for each cone specified in the sections ``CON`` and ``PSDCON``. :returns: A tuple ``(P, x, X, params)`` where - ``P`` is the imported picos :class:`~picos.Problem` object, - ``x`` is a list of multidimensional variables representing the scalar variables found in the file, - ``X`` is a list of symmetric variables representing the positive semidefinite variables found in the file, and - ``params`` is a dictionary containing PICOS cosntants used to define the problem. Indexing is with respect to the blocks of variables as defined in the sections ``VAR`` and ``CON`` of the file. """ P = Problem() try: f = open(filename, "r") except IOError: filename += ".cbf" f = open(filename, "r") line = f.readline() while not line.startswith("VER"): line = f.readline() ver = int(f.readline()) if ver != 1: warnings.warn("CBF file has a version other than 1.") structure_keywords = ["OBJSENSE", "PSDVAR", "VAR", "INT", "PSDCON", "CON"] data_keywords = [ "OBJFCOORD", "OBJACOORD", "OBJBCOORD", "FCOORD", "ACOORD", "BCOORD", "HCOORD", "DCOORD", ] structure_mode = True # still parsing structure blocks seen_blocks = [] parsed_blocks = {} while True: line = f.readline() if not line: break lsplit = line.split() if lsplit and lsplit[0] in structure_keywords: if lsplit[0] == "INT" and ("VAR" not in seen_blocks): raise Exception("INT BLOCK before VAR BLOCK") if lsplit[0] == "CON" and not ( "VAR" in seen_blocks or "PSDVAR" in seen_blocks ): raise Exception("CON BLOCK before VAR/PSDVAR BLOCK") if lsplit[0] == "PSDCON" and not ( "VAR" in seen_blocks or "PSDVAR" in seen_blocks ): raise Exception("PSDCON BLOCK before VAR/PSDVAR BLOCK") if lsplit[0] == "VAR" and ( "CON" in seen_blocks or "PSDCON" in seen_blocks ): raise Exception("VAR BLOCK after CON/PSDCON BLOCK") if lsplit[0] == "PSDVAR" and ( "CON" in seen_blocks or "PSDCON" in seen_blocks ): raise Exception("PSDVAR BLOCK after CON/PSDCON BLOCK") if structure_mode: parsed_blocks[lsplit[0]] = _read_cbf_block( P, lsplit[0], f, parsed_blocks ) seen_blocks.append(lsplit[0]) else: raise Exception("Structure keyword after first data item") if lsplit and lsplit[0] in data_keywords: if "OBJSENSE" not in seen_blocks: raise Exception("missing OBJSENSE block") if not ("VAR" in seen_blocks or "PSDVAR" in seen_blocks): raise Exception("missing VAR/PSDVAR block") if lsplit[0] in ("OBJFCOORD", "FCOORD") and not ( "PSDVAR" in seen_blocks ): raise Exception("missing PSDVAR block") if lsplit[0] in ("OBJACOORD", "ACOORD", "HCOORD") and not ( "VAR" in seen_blocks ): raise Exception("missing VAR block") if lsplit[0] in ("DCOORD", "HCOORD") and not ( "PSDCON" in seen_blocks ): raise Exception("missing PSDCON block") structure_mode = False parsed_blocks[lsplit[0]] = _read_cbf_block( P, lsplit[0], f, parsed_blocks ) seen_blocks.append(lsplit[0]) f.close() # variables if "VAR" in parsed_blocks: Nvars, varsz, x = parsed_blocks["VAR"] else: x = None if "INT" in parsed_blocks: x = parsed_blocks["INT"] if "PSDVAR" in parsed_blocks: psdsz, X = parsed_blocks["PSDVAR"] else: X = None # objective obj_constant = parsed_blocks.get("OBJBCOORD", 0) bobj = Constant("bobj", obj_constant) obj = Constant("bobj", obj_constant) aobj = {} if "OBJACOORD" in parsed_blocks: obj_vecs = _break_cols(parsed_blocks["OBJACOORD"], varsz) aobj = {} for k, v in enumerate(obj_vecs): if v: aobj[k] = Constant("c[" + str(k) + "]", v) obj += aobj[k] * x[k] Fobj = {} if "OBJFCOORD" in parsed_blocks: Fbl = parsed_blocks["OBJFCOORD"] for i, Fi in enumerate(Fbl): if Fi: Fobj[i] = Constant("F[" + str(i) + "]", Fi) obj += Fobj[i] | X[i] P.set_objective(P.objective.normalized.direction, obj) # cone constraints bb = {} AA = {} FF = {} if "CON" in parsed_blocks: Ncons, structcons = parsed_blocks["CON"] szcons = [s for tp, s in structcons] b = parsed_blocks.get("BCOORD", _spmatrix([], [], [], (Ncons, 1))) bvecs = _break_rows(b, szcons) consexp = [] for i, bi in enumerate(bvecs): bb[i] = Constant("b[" + str(i) + "]", bi) consexp.append(Constant("b[" + str(i) + "]", bi)) A = parsed_blocks.get("ACOORD", _spmatrix([], [], [], (Ncons, Nvars))) Ablc = _break_rows(A, szcons) for i, Ai in enumerate(Ablc): Aiblocs = _break_cols(Ai, varsz) for j, Aij in enumerate(Aiblocs): if Aij: AA[i, j] = Constant("A[" + str((i, j)) + "]", Aij) consexp[i] += AA[i, j] * x[j] Fcoords = parsed_blocks.get("FCOORD", {}) for k, mats in Fcoords.items(): i, row = _block_idx(k, szcons) row_exp = AffineExpression.zero() for j, mat in enumerate(mats): if mat: FF[i, j, row] = Constant("F[" + str((i, j, row)) + "]", mat) row_exp += FF[i, j, row] | X[j] consexp[i] += ( "e_" + str(row) + "(" + str(szcons[i]) + ",1)" ) * row_exp for i, (tp, sz) in enumerate(structcons): if tp == "F": continue elif tp == "L-": P.add_constraint(consexp[i] < 0) elif tp == "L+": P.add_constraint(consexp[i] > 0) elif tp == "L=": P.add_constraint(consexp[i] == 0) elif tp == "Q": P.add_constraint(abs(consexp[i][1:]) < consexp[i][0]) elif tp == "QR": P.add_constraint( abs(consexp[i][2:]) ** 2 < 2 * consexp[i][0] * consexp[i][1] ) else: raise Exception("unexpected cone type") DD = {} HH = {} if "PSDCON" in parsed_blocks: Dblocks = parsed_blocks.get( "DCOORD", [_spmatrix([], [], [], (ni, ni)) for ni in parsed_blocks["PSDCON"]], ) Hblocks = parsed_blocks.get("HCOORD", {}) consexp = [] for i, Di in enumerate(Dblocks): DD[i] = Constant("D[" + str(i) + "]", Di) consexp.append(Constant("D[" + str(i) + "]", Di)) for j, Hj in Hblocks.items(): i, col = _block_idx(j, varsz) for k, Hij in enumerate(Hj): if Hij: HH[k, i, col] = Constant("H[" + str((k, i, col)) + "]", Hij) consexp[k] += HH[k, i, col] * x[i][col] for exp in consexp: P.add_constraint(exp >> 0) params = { "aobj": aobj, "bobj": bobj, "Fobj": Fobj, "A": AA, "b": bb, "F": FF, "D": DD, "H": HH, } return P, x, X, params
# -------------------------------------- __all__ = api_end(_API_START, globals())