代码之家  ›  专栏  ›  技术社区  ›  Paul Reiners

Spark LinearRegressionSummary“正常”摘要

  •  13
  • Paul Reiners  · 技术社区  · 7 年前

    根据 LinearRegressionSummary (Spark 2.1.0 JavaDoc) ,p值仅适用于“法线”解算器。

    该值仅在使用“法线”解算器时可用。

    “正常”解算器到底是什么?

    我正在这样做:

    import org.apache.spark.ml.{Pipeline, PipelineModel} 
    import org.apache.spark.ml.evaluation.RegressionEvaluator 
    import org.apache.spark.ml.feature.VectorAssembler 
    import org.apache.spark.ml.regression.LinearRegressionModel 
    import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.{DataFrame, SparkSession}
        .
        .
        .
    val (trainingData, testData): (DataFrame, DataFrame) = 
      com.acme.pta.accuracy.Util.splitData(output, testProportion)
        .
        .
        .
    val lr = 
      new org.apache.spark.ml.regression.LinearRegression()
      .setSolver("normal").setMaxIter(maxIter)
    
    val pipeline = new Pipeline()
      .setStages(Array(lr))
    
    val paramGrid = new ParamGridBuilder()
      .addGrid(lr.elasticNetParam, Array(0.2, 0.4, 0.8, 0.9))
      .addGrid(lr.regParam, Array(0,6, 0.3, 0.1, 0.01))
      .build()
    
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(numFolds) // Use 3+ in practice
    
    val cvModel: CrossValidatorModel = cv.fit(trainingData)
    
    val pipelineModel: PipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
    val lrModel: LinearRegressionModel = 
      pipelineModel.stages(0).asInstanceOf[LinearRegressionModel]
    
    val modelSummary = lrModel.summary
    Holder.log.info("lrModel.summary: " + modelSummary)
    try {
      Holder.log.info("feature p values: ")
      // Exception occurs on line below.
      val featuresAndPValues = features.zip(lrModel.summary.pValues)
      featuresAndPValues.foreach(
        (featureAndPValue: (String, Double)) => 
        Holder.log.info(
          "feature: " + featureAndPValue._1 + ": " + featureAndPValue._2))
    } catch {
      case _: java.lang.UnsupportedOperationException 
                => Holder.log.error("Cannot compute p-values")
    }
    

    我仍然得到 UnsupportedOperationException .

    异常消息为:

    还有什么我需要做的吗?我正在使用

      "org.apache.spark" %% "spark-mllib" % "2.1.1"
    

    该版本是否支持pValue?

    1 回复  |  直到 7 年前
        1
  •  14
  •   Dennis Tsoi    7 年前

    已更新

    tl;博士

    解决方案1

    在正常情况下 LinearRegression 只有当其中一个参数时,才会出现Pvalue和其他“正常”统计信息 elasticNetParam regParam 为零。所以你可以改变

    .addGrid( lr.elasticNetParam, Array( 0.0 ) )
    

    .addGrid( lr.regParam, Array( 0.0 ) )
    

    解决方案2

    制作自定义版本 线性回归

    1. 回归的“正常”解算器。
    2. Cholesky 求解器 WeightedLeastSquares .

    我上这个课是为了 ml.regression 包裹

    package org.apache.spark.ml.regression
    
    import scala.collection.mutable
    
    import org.apache.spark.SparkException
    import org.apache.spark.internal.Logging
    import org.apache.spark.ml.feature.Instance
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.ml.optim.WeightedLeastSquares
    import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
    import org.apache.spark.ml.util._
    import org.apache.spark.mllib.linalg.VectorImplicits._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row}
    import org.apache.spark.sql.functions._
    
    class CholeskyLinearRegression ( override val uid: String )
        extends Regressor[ Vector, CholeskyLinearRegression, LinearRegressionModel ]
        with LinearRegressionParams with DefaultParamsWritable with Logging {
    
        import CholeskyLinearRegression._
    
        def this() = this(Identifiable.randomUID("linReg"))
    
        def setRegParam(value: Double): this.type = set(regParam, value)
        setDefault(regParam -> 0.0)
    
        def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
        setDefault(fitIntercept -> true)
    
        def setStandardization(value: Boolean): this.type = set(standardization, value)
        setDefault(standardization -> true)
    
        def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value)
        setDefault(elasticNetParam -> 0.0)
    
        def setMaxIter(value: Int): this.type = set(maxIter, value)
        setDefault(maxIter -> 100)
    
        def setTol(value: Double): this.type = set(tol, value)
        setDefault(tol -> 1E-6)
    
        def setWeightCol(value: String): this.type = set(weightCol, value)
    
        def setSolver(value: String): this.type = set(solver, value)
        setDefault(solver -> Auto)
    
        def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
        setDefault(aggregationDepth -> 2)
    
        override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
    
            // Extract the number of features before deciding optimization solver.
            val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
            val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
    
            val instances: RDD[Instance] = 
                dataset
                .select( col( $(labelCol) ), w, col( $(featuresCol) ) )
                .rdd.map {
                    case Row(label: Double, weight: Double, features: Vector) =>
                    Instance(label, weight, features)
                }
    
            // if (($(solver) == Auto &&
            //   numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal) {
            // For low dimensional data, WeightedLeastSquares is more efficient since the
            // training algorithm only requires one pass through the data. (SPARK-10668)
    
            val optimizer = new WeightedLeastSquares( 
                $(fitIntercept), 
                $(regParam),
                elasticNetParam = $(elasticNetParam), 
                $(standardization), 
                true,
                solverType = WeightedLeastSquares.Cholesky, 
                maxIter = $(maxIter), 
                tol = $(tol)
            )
    
            val model = optimizer.fit(instances)
    
            val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept))
            val (summaryModel, predictionColName) = lrModel.findSummaryModelAndPredictionCol()
    
            val trainingSummary = new LinearRegressionTrainingSummary(
                summaryModel.transform(dataset),
                predictionColName,
                $(labelCol),
                $(featuresCol),
                summaryModel,
                model.diagInvAtWA.toArray,
                model.objectiveHistory
            )
    
            lrModel
            .setSummary( Some( trainingSummary ) )
    
            lrModel
        }
    
        override def copy(extra: ParamMap): CholeskyLinearRegression = defaultCopy(extra)
    }
    
    object CholeskyLinearRegression 
        extends DefaultParamsReadable[CholeskyLinearRegression] {
    
        override def load(path: String): CholeskyLinearRegression = super.load(path)
    
        val MAX_FEATURES_FOR_NORMAL_SOLVER: Int = WeightedLeastSquares.MAX_NUM_FEATURES
    
        /** String name for "auto". */
        private[regression] val Auto = "auto"
    
        /** String name for "normal". */
        private[regression] val Normal = "normal"
    
        /** String name for "l-bfgs". */
        private[regression] val LBFGS = "l-bfgs"
    
        /** Set of solvers that LinearRegression supports. */
        private[regression] val supportedSolvers = Array(Auto, Normal, LBFGS)
    }
    

    您所要做的就是将其粘贴到项目中的单独文件中并进行更改 CholeskyLinearRegression 在您的代码中。

    val lr = new CholeskyLinearRegression() // new LinearRegression()
            .setSolver( "normal" )
            .setMaxIter( maxIter )
    

    它与非零参数一起工作,并给出pValue . 在以下参数网格上测试。

    val paramGrid = new ParamGridBuilder()
            .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
            .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
            .build()
    

    全面调查

    我最初认为主要问题是模型没有得到充分保存。拟合后不保留训练模型 CrossValidator . 这是可以理解的,因为内存消耗。有一个正在进行的 debate 应该如何解决。 Issue 在吉拉。

    您可以在评论部分中看到,我试图从最佳模型中提取参数,以便再次运行它。然后我发现模型摘要是可以的,只是一些参数 diagInvAtWa

    对于岭回归或Tikhonov正则化( elasticNet = 0 )以及任何 正则参数 可以计算Pvalue和其他“正常”统计信息,但对于套索方法和介于两者之间的某种方法(弹性网)则无法计算。同样适用于 regParam = 0 :带有任何 elasticNet

    为什么会这样

    线性回归 uses 用于“正常”解算器的加权最小二乘优化器 solverType = WeightedLeastSquares.Auto . 此优化器具有 two options 对于求解器: QuasiNewton 乔尔斯基 elasticNetParam 是非零。

    val solver = if (
        ( solverType == WeightedLeastSquares.Auto && 
            elasticNetParam != 0.0 && 
            regParam != 0.0 ) ||
        ( solverType == WeightedLeastSquares.QuasiNewton ) ) {
    
        ...
        new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun)
    } else {
        new CholeskySolver
    }
    

    因此,在参数网格中 QuasiNewtonSolver 将始终使用,因为没有 正则参数 elasticNetParam 其中一个为零。

    我们知道,为了得到Pvalue和其他“正态”统计量,例如系数的t统计量或标准误差,矩阵的对角线(A^t*W*A)^-1( diagInvAtWA )不能是只有一个零的向量。该条件在定义中设置 pValues .

    diagInvAtWA公司 solution.aaInv ).

    val diagInvAtWA = solution.aaInv.map { inv => ...
    

    对于 Cholesky solver 它是 calculated 但对于 准牛顿 not parameter NormalEquationSolution 就是这个矩阵。

    从技术上讲,你可以用

    生殖

    在这个例子中,我使用了数据 sample_linear_regression_data.txt 从…起 here .

    复制完整代码

    import org.apache.spark._
    
    import org.apache.spark.ml.{Pipeline, PipelineModel} 
    import org.apache.spark.ml.evaluation.{RegressionEvaluator, BinaryClassificationEvaluator}
    import org.apache.spark.ml.feature.VectorAssembler 
    import org.apache.spark.ml.regression.{LinearRegressionModel, LinearRegression}
    import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder} 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.ml.param.ParamMap
    
    object Main {
    
        def main( args: Array[ String ] ): Unit = {
    
            val spark =
                SparkSession
                .builder()
                .appName( "SO" )
                .master( "local[*]" )
                .config( "spark.driver.host", "localhost" )
                .getOrCreate()
    
            import spark.implicits._
    
            val data = 
                spark
                .read
                .format( "libsvm" )
                .load( "./sample_linear_regression_data.txt" )
    
            val Array( training, test ) = 
                data
                .randomSplit( Array( 0.9, 0.1 ), seed = 12345 )
    
            val maxIter = 10;
    
            val lr = new LinearRegression()
                .setSolver( "normal" )
                .setMaxIter( maxIter )
    
            val paramGrid = new ParamGridBuilder()
                // .addGrid( lr.elasticNetParam, Array( 0.2, 0.4, 0.8, 0.9 ) )
                .addGrid( lr.elasticNetParam, Array( 0.0 ) )
                .addGrid( lr.regParam, Array( 0.6, 0.3, 0.1, 0.01 ) )
                .build()
    
            val pipeline = new Pipeline()
                .setStages( Array( lr ) )
    
            val cv = new CrossValidator()
                .setEstimator( pipeline )
                .setEvaluator( new RegressionEvaluator )
                .setEstimatorParamMaps( paramGrid )
                .setNumFolds( 2 )  // Use 3+ in practice
    
            val cvModel = 
                cv
                .fit( training )
    
            val pipelineModel: PipelineModel = 
                cvModel
                .bestModel
                .asInstanceOf[ PipelineModel ]
    
            val lrModel: LinearRegressionModel = 
                pipelineModel
                .stages( 0 )
                .asInstanceOf[ LinearRegressionModel ]
    
            // Technically there is a way to use exact ParamMap
            // to build a new LR but for the simplicity I'll 
            // get and set them explicitly
    
            // lrModel.params.foreach( ( param ) => {
    
            //     println( param )
            // } )
    
            // val bestLr = new LinearRegression()
            //     .setSolver( "normal" )
            //     .setMaxIter( maxIter )
            //     .setRegParam( lrModel.getRegParam )
            //     .setElasticNetParam( lrModel.getElasticNetParam )
    
            // val bestLrModel = bestLr.fit( training )
    
            val modelSummary = 
                lrModel
                .summary
    
            println( "lrModel pValues: " + modelSummary.pValues.mkString( ", " ) )
    
            spark.stop()
        }
    }
    

    起初的

    available :

    这个 coefficientStandardErrors , tValues pValues 仅在使用“法线”解算器时可用,因为它们都基于 diagInvAtWA公司 -矩阵(a^T*W*a)^-1的对角线。