import ast
from typing import Callable, List, Optional, Union, Dict, Tuple, Any
import numpy as np
from scipy.optimize import curve_fit
from Orange.data import Table, Domain, ContinuousVariable, StringVariable
from Orange.data.filter import HasClass
from Orange.data.util import get_unique_names
from Orange.preprocess import RemoveNaNColumns, Impute
from Orange.regression import Learner, Model
__all__ = ["CurveFitLearner"]
class CurveFitModel(Model):
def __init__(
self,
domain: Domain,
original_domain: Domain,
parameters_names: List[str],
parameters: np.ndarray,
function: Optional[Callable],
create_lambda_args: Optional[Tuple]
):
super().__init__(domain, original_domain)
self.__parameters_names = parameters_names
self.__parameters = parameters
if function is None and create_lambda_args is not None:
function, names, _ = _create_lambda(**create_lambda_args)
assert parameters_names == names
assert function
self.__function = function
self.__create_lambda_args = create_lambda_args
@property
def coefficients(self) -> Table:
return Table(Domain([ContinuousVariable("coef")],
metas=[StringVariable("name")]),
self.__parameters[:, None],
metas=np.array(self.__parameters_names)[:, None])
def predict(self, X: np.ndarray) -> np.ndarray:
predicted = self.__function(X, *self.__parameters)
if not isinstance(predicted, np.ndarray):
# handle constant function; i.e. len(self.domain.attributes) == 0
return np.full(len(X), predicted, dtype=float)
return predicted.flatten()
def __getstate__(self) -> Dict:
if not self.__create_lambda_args:
raise AttributeError(
"Can't pickle/copy callable. Use str expression instead."
)
return {
"domain": self.domain,
"original_domain": self.original_domain,
"parameters_names": self.__parameters_names,
"parameters": self.__parameters,
"function": None,
"args": self.__create_lambda_args,
}
def __setstate__(self, state: Dict):
self.__init__(*state.values())
[docs]
class CurveFitLearner(Learner):
"""
Fit a function to data.
It uses the scipy.curve_fit to find the optimal values of parameters.
Parameters
----------
expression : callable or str
A modeling function.
If callable, it must take the independent variable as the first
argument and the parameters to fit as separate remaining arguments.
If string, a lambda function is created,
using `expression`, `available_feature_names`, `function` and `env`
attributes.
Should be string for pickling the model.
parameters_names : list of str
List of parameters names. Only needed when the expression
is callable.
features_names : list of str
List of features names. Only needed when the expression
is callable.
available_feature_names : list of str
List of all available features names. Only needed when the expression
is string. Needed to distinguish between parameters and features when
translating the expression into the lambda.
functions : list of str
List of all available functions. Only needed when the expression
is string. Needed to distinguish between parameters and functions when
translating the expression into the lambda.
sanitizer : callable
Function for sanitizing names.
env : dict
An environment to capture in the lambda's closure.
p0 : list of floats, optional
Initial guess for the parameters.
bounds : 2-tuple of array_like, optional
Lower and upper bounds on parameters.
preprocessors : tuple of Orange preprocessors, optional
The processors that will be used when data is passed to the learner.
Examples
--------
>>> import numpy as np
>>> from Orange.data import Table
>>> from Orange.regression import CurveFitLearner
>>> data = Table("housing")
>>> # example with callable expression
>>> cfun = lambda x, a, b, c: a * np.exp(-b * x[:, 0] * x[:, 1]) + c
>>> learner = CurveFitLearner(cfun, ["a", "b", "c"], ["CRIM", "LSTAT"])
>>> model = learner(data)
>>> pred = model(data)
>>> coef = model.coefficients
>>> # example with str expression
>>> sfun = "a * exp(-b * CRIM * LSTAT) + c"
>>> names = [a.name for a in data.domain.attributes]
>>> learner = CurveFitLearner(sfun, available_feature_names=names,
... functions=["exp"])
>>> model = learner(data)
>>> pred = model(data)
>>> coef = model.coefficients
"""
preprocessors = [HasClass(), RemoveNaNColumns(), Impute()]
__returns__ = CurveFitModel
name = "Curve Fit"
def __init__(
self,
expression: Union[Callable, ast.Expression, str],
parameters_names: Optional[List[str]] = None,
features_names: Optional[List[str]] = None,
available_feature_names: Optional[List[str]] = None,
functions: Optional[List[str]] = None,
sanitizer: Optional[Callable] = None,
env: Optional[Dict[str, Any]] = None,
p0: Union[List, Dict, None] = None,
bounds: Union[Tuple, Dict] = (-np.inf, np.inf),
preprocessors=None
):
super().__init__(preprocessors)
if callable(expression):
if parameters_names is None:
raise TypeError("Provide 'parameters_names' parameter.")
if features_names is None:
raise TypeError("Provide 'features_names' parameter.")
args = None
function = expression
else:
if available_feature_names is None:
raise TypeError("Provide 'available_feature_names' parameter.")
if functions is None:
raise TypeError("Provide 'functions' parameter.")
args = dict(expression=expression,
available_feature_names=available_feature_names,
functions=functions, sanitizer=sanitizer, env=env)
function, parameters_names, features_names = _create_lambda(**args)
if isinstance(p0, dict):
p0 = [p0.get(p, 1) for p in parameters_names]
if isinstance(bounds, dict):
d = [-np.inf, np.inf]
lower_bounds = [bounds.get(p, d)[0] for p in parameters_names]
upper_bounds = [bounds.get(p, d)[1] for p in parameters_names]
bounds = lower_bounds, upper_bounds
self.__function = function
self.__parameters_names = parameters_names
self.__features_names = features_names
self.__p0 = p0
self.__bounds = bounds
# needed for pickling - if the expression is a lambda function, the
# learner is not picklable
self.__create_lambda_args = args
@property
def parameters_names(self) -> List[str]:
return self.__parameters_names
[docs]
def fit_storage(self, data: Table) -> CurveFitModel:
domain: Domain = data.domain
attributes = []
for attr in domain.attributes:
if attr.name in self.__features_names:
if not attr.is_continuous:
raise ValueError("Numeric feature expected.")
attributes.append(attr)
new_domain = Domain(attributes, domain.class_vars, domain.metas)
transformed = data.transform(new_domain)
params = curve_fit(self.__function, transformed.X, transformed.Y,
p0=self.__p0, bounds=self.__bounds)[0]
return CurveFitModel(new_domain, domain,
self.__parameters_names, params, self.__function,
self.__create_lambda_args)
def __getstate__(self) -> Dict:
if not self.__create_lambda_args:
raise AttributeError(
"Can't pickle/copy callable. Use str expression instead."
)
state = self.__create_lambda_args.copy()
state["parameters_names"] = None
state["features_names"] = None
state["p0"] = self.__p0
state["bounds"] = self.__bounds
state["preprocessors"] = self.preprocessors
return state
def __setstate__(self, state: Dict):
expression = state.pop("expression")
self.__init__(expression, **state)
def _create_lambda(
expression: Union[str, ast.Expression] = "",
available_feature_names: List[str] = None,
functions: List[str] = None,
sanitizer: Callable = None,
env: Optional[Dict[str, Any]] = None
) -> Tuple[Callable, List[str], List[str]]:
"""
Create a lambda function from a string expression.
Parameters
----------
expression : str or ast.Expression
Right side of a modeling function.
available_feature_names : list of str
List of all available features names.
Needed to distinguish between parameters, features and functions.
functions : list of str
List of all available functions.
Needed to distinguish between parameters, features and functions.
sanitizer : callable, optional
Function for sanitizing variable names.
env : dict, optional
An environment to capture in the lambda's closure.
Returns
-------
func : callable
The created lambda function.
params : list of str
The recognied parameters withint the expression.
vars_ : list of str
The recognied variables withint the expression.
Examples
--------
>>> from Orange.data import Table
>>> data = Table("housing")
>>> sfun = "a * exp(-b * CRIM * LSTAT) + c"
>>> names = [a.name for a in data.domain.attributes]
>>> func, par, var = _create_lambda(sfun, available_feature_names=names,
... functions=["exp"], env={"exp": np.exp})
>>> y = func(data.X, 1, 2, 3)
>>> par
['a', 'b', 'c']
>>> var
['CRIM', 'LSTAT']
"""
if sanitizer is None:
sanitizer = lambda n: n
if env is None:
env = {name: getattr(np, name) for name in functions}
exp = ast.parse(expression, mode="eval")
search = _ParametersSearch(
[sanitizer(name) for name in available_feature_names],
functions
)
search.visit(exp)
params = search.parameters
used_sanitized_feature_names = search.variables
name = get_unique_names(params, "x")
feature_mapper = {n: i for i, n in enumerate(used_sanitized_feature_names)}
exp = _ReplaceVars(name, feature_mapper, functions).visit(exp)
lambda_ = ast.Lambda(
args=ast.arguments(
posonlyargs=[],
args=[ast.arg(arg=arg) for arg in [name] + params],
varargs=None,
kwonlyargs=[],
kw_defaults=[],
defaults=[],
),
body=exp.body
)
exp = ast.Expression(body=lambda_)
ast.fix_missing_locations(exp)
vars_ = [name for name in available_feature_names
if sanitizer(name) in used_sanitized_feature_names]
# pylint: disable=eval-used
return eval(compile(exp, "<lambda>", mode="eval"), env), params, vars_
class _ParametersSearch(ast.NodeVisitor):
"""
Find features and parameters:
- feature: if node is instance of ast.Name and is included in vars_names
- parameters: if node is instance of ast.Name and is not included
in functions
Parameters
----------
vars_names : list of str
List of all available features names.
Needed to distinguish between parameters, features and functions.
functions : list of str
List of all available functions.
Needed to distinguish between parameters, features and functions.
Attributes
----------
parameters : list of str
List of used parameters.
variables : list of str
List of used features.
"""
def __init__(self, vars_names: List[str], functions: List[str]):
super().__init__()
self.__vars_names = vars_names
self.__functions = functions
self.__parameters: List[str] = []
self.__variables: List[str] = []
@property
def parameters(self) -> List[str]:
return self.__parameters
@property
def variables(self) -> List[str]:
return self.__variables
def visit_Name(self, node: ast.Name) -> ast.Name:
if node.id in self.__vars_names:
# don't use Set in order to preserve parameters order
if node.id not in self.__variables:
self.__variables.append(node.id)
elif node.id not in self.__functions:
# don't use Set in order to preserve parameters order
if node.id not in self.__parameters:
self.__parameters.append(node.id)
return node
class _ReplaceVars(ast.NodeTransformer):
"""
Replace feature names with X[:, i], where i is index of feature.
Parameters
----------
name : str
List of all available features names.
Needed to distinguish between parameters, features and functions.
vars_mapper : dict
Dictionary of used features names and the belonging index from domain.
functions : list of str
List of all available functions.
"""
def __init__(self, name: str, vars_mapper: Dict, functions: List):
super().__init__()
self.__name = name
self.__vars_mapper = vars_mapper
self.__functions = functions
def visit_Name(self, node: ast.Name) -> Union[ast.Name, ast.Subscript]:
if node.id not in self.__vars_mapper or node.id in self.__functions:
return node
else:
n = self.__vars_mapper[node.id]
return ast.Subscript(
value=ast.Name(id=self.__name, ctx=ast.Load()),
slice=ast.ExtSlice(
dims=[ast.Slice(lower=None, upper=None, step=None),
ast.Index(value=ast.Num(n=n))]),
ctx=node.ctx
)
if __name__ == "__main__":
import matplotlib.pyplot as plt
housing = Table("housing")
xdata = housing.X
ydata = housing.Y
func = lambda x, a, b, c: a * np.exp(-b * x[:, 0]) + c
pred = CurveFitLearner(func, ["a", "b", "c"], ["LSTAT"])(housing)(housing)
plt.plot(xdata[:, 12], ydata, "o")
indices = np.argsort(xdata[:, 12])
plt.plot(xdata[indices, 12], pred[indices])
plt.show()