Source code for RankingTrainValidationSplit

# Copyright (C) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in project root for information.


import sys

if sys.version >= '3':
    basestring = str

from pyspark.ml.param.shared import *
from mmlspark.Utils import *
from pyspark.ml.common import inherit_doc
from pyspark.ml import Model
from pyspark.ml.util import *
from multiprocessing.pool import ThreadPool

import numpy as np
import pyspark
import pyspark.sql.functions as F
import sys
from mmlspark.RankingSplit import *
from mmlspark.RankingAdapter import *
from pyspark import keyword_only
from pyspark.ml import Estimator
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.tuning import ValidatorParams
from pyspark.ml.util import *
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
from pyspark.ml.param.shared import HasParallelism

if sys.version >= '3':
    basestring = str


def _parallelFitTasks(est, train, eva, validation, epm, collectSubModel, collectSubMetrics):
    """
    Creates a list of callables which can be called from different threads to fit and evaluate
    an estimator in parallel. Each callable returns an `(index, metric)` pair.

    :param est: Estimator, the estimator to be fit.
    :param train: DataFrame, training data set, used for fitting.
    :param eva: Evaluator, used to compute `metric`
    :param validation: DataFrame, validation data set, used for evaluation.
    :param epm: Sequence of ParamMap, params maps to be used during fitting & evaluation.
    :param collectSubModel: Whether to collect sub model.
    :return: (int, float, subModel), an index into `epm` and the associated metric value.
    """
    modelIter = est.fitMultiple(train, epm)

    def singleTask():
        index, model = next(modelIter)
        df = model.transform(validation, epm[index])
        metric = eva.evaluate(df)
        metrics = eva.getMetricsMap(df)
        return index, metric, model if collectSubModel else None, metrics if collectSubMetrics else None

    return [singleTask] * len(epm)

[docs]class HasCollectSubMetrics(Params): """ Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver. """ collectSubMetrics = Param(Params._dummy(), "collectSubMetrics", "Param for whether to collect a list of sub-models metrics.", typeConverter=TypeConverters.toBoolean) def __init__(self): super(HasCollectSubMetrics, self).__init__() self._setDefault(collectSubMetrics=False)
[docs] def setCollectSubMetrics(self, value): """ Sets the value of :py:attr:`collectSubModels`. """ return self._set(collectSubMetrics=value)
[docs] def getCollectSubMetrics(self): """ Gets the value of collectSubModels or its default value. """ return self.getOrDefault(self.collectSubMetrics)
[docs]class HasCollectSubModels(Params): """ Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver. """ collectSubModels = Param(Params._dummy(), "collectSubModels", "Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.", typeConverter=TypeConverters.toBoolean) def __init__(self): super(HasCollectSubModels, self).__init__() self._setDefault(collectSubModels=False)
[docs] def setCollectSubModels(self, value): """ Sets the value of :py:attr:`collectSubModels`. """ return self._set(collectSubModels=value)
[docs] def getCollectSubModels(self): """ Gets the value of collectSubModels or its default value. """ return self.getOrDefault(self.collectSubModels)
[docs]@inherit_doc class RankingTrainValidationSplit(Estimator, ValidatorParams, HasCollectSubModels, HasCollectSubMetrics, HasParallelism): trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\ validation data. Must be between 0 and 1.", typeConverter=TypeConverters.toFloat) userCol = Param(Params._dummy(), "userCol", "userCol: column name for user ids. Ids must be within the integer value range. (default: user)") ratingCol = Param(Params._dummy(), "ratingCol", "ratingCol: column name for ratings (default: rating)") itemCol = Param(Params._dummy(), "itemCol", "itemCol: column name for item ids. Ids must be within the integer value range. (default: item)")
[docs] def setTrainRatio(self, value): """ Sets the value of :py:attr:`trainRatio`. """ return self._set(trainRatio=value)
[docs] def getTrainRatio(self): """ Gets the value of trainRatio or its default value. """ return self.getOrDefault(self.trainRatio)
[docs] def setItemCol(self, value): """ Args: itemCol (str): column name for item ids. Ids must be within the integer value range. (default: item) """ self._set(itemCol=value) return self
[docs] def getItemCol(self): """ Returns: str: column name for item ids. Ids must be within the integer value range. (default: item) """ return self.getOrDefault(self.itemCol)
[docs] def setRatingCol(self, value): """ Args: ratingCol (str): column name for ratings (default: rating) """ self._set(ratingCol=value) return self
[docs] def getRatingCol(self): """ Returns: str: column name for ratings (default: rating) """ return self.getOrDefault(self.ratingCol)
[docs] def setUserCol(self, value): """ Args: userCol (str): column name for user ids. Ids must be within the integer value range. (default: user) """ self._set(userCol=value) return self
[docs] def getUserCol(self): """ Returns: str: column name for user ids. Ids must be within the integer value range. (default: user) """ return self.getOrDefault(self.userCol)
@keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, seed=None, trainRatio=0.8, java=False): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ seed=None) """ super(RankingTrainValidationSplit, self).__init__() kwargs = self._input_kwargs self._set(**kwargs) self.java = java
[docs] @keyword_only def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ seed=None): Sets params for cross validator. """ kwargs = self._input_kwargs return self._set(**kwargs)
[docs] def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() newCV = Params.copy(self, extra) if self.isSet(self.estimator): newCV.setEstimator(self.getEstimator().copy(extra)) # estimatorParamMaps remain the same if self.isSet(self.evaluator): newCV.setEvaluator(self.getEvaluator().copy(extra)) return newCV
def _create_model(self, java_model): model = RankingTrainValidationSplitModel() model._java_obj = java_model model._transfer_params_from_java() return model def _fit(self, dataset): if self.java: return self._to_java().fit(dataset._jdf) rating = self.getOrDefault(self.estimator).getRatingCol() userColumn = self.getOrDefault(self.estimator).getUserCol() itemColumn = self.getOrDefault(self.estimator).getItemCol() eva = self.getOrDefault(self.evaluator) est = RankingAdapter() \ .setRecommender(self.getOrDefault(self.estimator)) \ .setMode("allUsers") \ .setK(eva.getK()) \ .setUserCol(userColumn) \ .setItemCol(itemColumn) \ .setRatingCol(rating) epm = self.getOrDefault(self.estimatorParamMaps) numModels = len(epm) tRatio = self.getOrDefault(self.trainRatio) seed = self.getOrDefault(self.seed) pyspark.sql.DataFrame.min_rating_filter = RankingSplit.min_rating_filter pyspark.sql.DataFrame.stratified_split = RankingSplit.stratified_split temp_train, temp_validation = dataset \ .dropDuplicates() \ .withColumnRenamed(userColumn, 'customerID') \ .withColumnRenamed(itemColumn, 'itemID') \ .min_rating_filter(min_rating=6, by_customer=True) \ .stratified_split(min_rating=3, by_customer=True, fixed_test_sample=False, ratio=tRatio) train = temp_train \ .withColumnRenamed('customerID', userColumn) \ .withColumnRenamed('itemID', itemColumn) validation = temp_validation \ .withColumnRenamed('customerID', userColumn) \ .withColumnRenamed('itemID', itemColumn) subModels = None collectSubModelsParam = self.getCollectSubModels() if collectSubModelsParam: subModels = [None for i in range(numModels)] subMetrics = None collectSubMetricsParam = self.getCollectSubMetrics() if collectSubMetricsParam: subMetrics = [None for i in range(numModels)] tasks = _parallelFitTasks(est, train, eva, validation, epm, collectSubModelsParam, collectSubMetricsParam) pool = ThreadPool(processes=min(self.getParallelism(), numModels)) metrics = [None] * numModels for j, metric, subModel, subMetric in pool.imap_unordered(lambda f: f(), tasks): metrics[j] = metric if collectSubModelsParam: subModels[j] = subModel if collectSubMetricsParam: subMetrics[j] = subMetric train.unpersist() validation.unpersist() if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) return self._copyValues(RankingTrainValidationSplitModel(bestModel, metrics, subModels, subMetrics))
[docs]@inherit_doc class RankingTrainValidationSplitModel(Model, ValidatorParams): def __init__(self, bestModel, validationMetrics=[], subModels=None, subMetrics=None): super(RankingTrainValidationSplitModel, self).__init__() #: best model from cross validation self.bestModel = bestModel #: evaluated validation metrics self.validationMetrics = validationMetrics self.subModels = subModels self.subMetrics = subMetrics def _transform(self, dataset): return self.bestModel.transform(dataset)
[docs] def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. And, this creates a shallow copy of the validationMetrics. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ if extra is None: extra = dict() bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) return RankingTrainValidationSplitModel(bestModel, validationMetrics)
[docs] def recommendForAllUsers(self, numItems): # return self.bestModel.recommendForAllUsers(numItems) return self.bestModel._call_java("recommendForAllUsers", numItems)
[docs] def recommendForAllItems(self, numItems): return self.bestModel.recommendForAllItems(numItems)