MLlib中的ml lr源码分析

Reading time ~8 minutes

介绍

MLlib中有两个lr实现。分别在mllib和ml中。此处分析的是ml,这也是后续mllib继续重点发展的一个库, mllib则会渐渐淡出历史舞台。ml的logistic回归,代码在org.apache.spark.ml.classification.LogisticRegression中实现。在详见代码之前,我们先简单地看下LR可调节的参数. [暂时修正至以1.6.1代码为参考.]

一、LR模型的参数

参数:

  • threshold: 如果label 1的概率>threshold,则预测为1,否则为0.
  • regParam:正则项参数(相当于 $ \lambda $)
  • elasticNetParam:ElasticNet混合参数 $ \alpha $ 。如果$ \alpha = 0 $, 惩罚项是一个L2 penalty。如果$ \alpha = 1 $,它是一个L1 penalty。如果$ 0 < \alpha < 1 $,则是L1和L2的结果。缺省为0.0,为L2罚项。
  • maxIter: 缺省100次。
  • tol:迭代收敛的容忍度。值越小,会得到更高精度,同时迭代次数开销也大。缺省为1E-6.
  • fitIntercept: 是否要拟合截距项(intercept term),缺省为true.
  • standardization:在对模型进行fit前,是否对训练特征进行标准化(归一化)。模型的系数将总是返回到原比例(origin scale)上,它对用户是透明的。注意:当不使用正则化时,使用/不使用standardization,模型都应收敛到相同的解(solution)上。在R的GLMNET包里,缺省行为也为true。缺省为true。
  • weightCol:如果没设置或空,所有实例为有为1的权重。如果设置,则相当于对unbalanced data设置不同的权重。缺省不设置。
  • treeAggregate:如果特征维度或分区数太大,该参数需要调得更大。缺省为2.

二、原理

logistic回归的原理,这里不详述。简单地写:y = logit(∑wx + b)。相应的loss和梯度如下所示(箭头表示向量):

\[l_{total}(\vec{w}, \vec{x})=l_{model}(\vec{w}, \vec{x})+l_{reg}(\vec{w})\] \[\vec{G} (\vec{w}, \vec{x})_{total}= \vec{G} (\vec{w}, \vec{x})_{model} + \vec{G} (\vec{w})_{reg}\]

$l_{model}$为cross-entropy;$l_{reg}$为正则项。

对于第一部分$l_{model}$的计算,依赖于训练数据;对于第二部分正则项,它不依赖于训练数据。因而这两部分在实现时是独立计算的。

每个训练样本的loss和gradient是独立的。

样本i的梯度:

\[(\vec{G}(\vec{w}, \vec{x})_{model})_i= G_i(\vec{w}, \vec{x})= \frac{\partial{l(\vec{w}, \vec{x})}}{\partial{w_i}}=\sum_{k=1}^{N}y_{k} x_{ki} - \frac{exp(\vec{x}_{k} \vec{w})}{1+exp(\vec{x}_k,\vec{w})}x_{ki}\]

样本的loss:

\[l(\vec{w}, \vec{x})_{model}=-\sum_{k=1}^{N}y_{k} \vec{x}_{k} \vec{w} - log(1+exp(\vec{x}_k \vec{w}))\]

对于spark来说,ml的logistic回归实现通过treeAggregate来完成。在executors/workers上计算各RDD上的loss和gradient;在driver/controller上将它们进行reduce进行所有训练样本的求和,得到lossSum和gradientSum。

在单机的driver上完成Optimization; L1正则由OWLQN optimizer处理。L2由L-BFGS处理。这两个优化器都由Breeze库提供(Breeze库提供了Vector/Matrix的实现以及相应计算的接口Linalg)。

整个过程如图所示:

三、loss与gradient

主要在LogisticCostFun类中,具体见代码,这里仅把核心代码及注释帖出来。可以对应于上面的公式。

private class LogisticCostFun(
    instances: RDD[Instance],
    numClasses: Int,
    fitIntercept: Boolean,
    standardization: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double],
    regParamL2: Double) extends DiffFunction[BDV[Double]] {

  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
    val numFeatures = featuresStd.length
    val coeffs = Vectors.fromBreeze(coefficients)

    // step 1: cost部分.
    val logisticAggregator = {
      val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
      val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)

      // 先在rdd的每个分区上应用seqOp函数,做add操作(计算每个样本的loss/gradient)
      // 再在driver上应用comOp函数,做merge操作(求得总的loss/gradient)
      instances.treeAggregate(
        new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
      )(seqOp, combOp)
    }

    // 求得总的gradientArray.
    val totalGradientArray = logisticAggregator.gradient.toArray

    // step 2: L2正则项部分 => regVal. 
    //  reg = lambda * ∑ regParamL2/2 wi^2 + (1-regParamL2) |wi| 
    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
    val regVal = if (regParamL2 == 0.0) {
      0.0
    } else {
      var sum = 0.0
      
      coeffs.foreachActive { (index, value) =>
        // If `fitIntercept` is true, the last term which is intercept doesn't
        // contribute to the regularization.
        if (index != numFeatures) {
          // The following code will compute the loss of the regularization; also
          // the gradient of the regularization, and add back to totalGradientArray.
          // gradientArray:  costFun项的梯度 + 正则项的梯度
          // sum: L2正则
          sum += {
             
            if (standardization) {
              
              totalGradientArray(index) += regParamL2 * value
              value * value
            } else {
              // 
              if (featuresStd(index) != 0.0) {
                // If `standardization` is false, we still standardize the data
                // to improve the rate of convergence; as a result, we have to
                // perform this reverse standardization by penalizing each component
                // differently to get effectively the same objective function when
                // the training dataset is not standardized.
                val temp = value / (featuresStd(index) * featuresStd(index))
                totalGradientArray(index) += regParamL2 * temp
                value * temp
              } else {
                0.0
              }
            }
          }
        }
      }

      0.5 * regParamL2 * sum
    }

    // 返回带L2正则的loss,以及带L2正则的梯度.
    (logisticAggregator.loss + regVal, new BDV(totalGradientArray))
  }
}

里面会涉及到另一个类:LogisticAggregator。它的作用,相当于做map/reduce运算,计算loss/gradient。

private class LogisticAggregator(
    coefficients: Vector,
    numClasses: Int,
    fitIntercept: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double]) extends Serializable {

  private var weightSum = 0.0
  private var lossSum = 0.0

  // coefficients => coefficientsArray 数组.
  private val coefficientsArray = coefficients match {
    case dv: DenseVector => dv.values
    case _ =>
      throw new IllegalArgumentException(
        s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
  }

  // 总的维度.
  private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length

  //gradientSumArray.
  private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)

  /**
   * 将一个训练样本添加到LogisticAggregator, 并更新目标函数的loss和gradient.
   * 
   * Add a new training instance to this LogisticAggregator, and update the loss and gradient
   * of the objective function.
   *
   * @param instance The instance of data point to be added.
   * @return This LogisticAggregator object.
   */
  def add(instance: Instance): this.type = {
    instance match { case Instance(label, weight, features) =>
      require(dim == features.size, s"Dimensions mismatch when adding new instance." +
        s" Expecting $dim but got ${features.size}.")
      require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

      if (weight == 0.0) return this

      val localCoefficientsArray = coefficientsArray
      val localGradientSumArray = gradientSumArray

      numClasses match {
        case 2 =>
          // For Binary Logistic Regression.
          // step 1: 二分类,求得 z = ∑ w*x + b,取负号.
          val margin = - {
            var sum = 0.0

            // a.每个特征号上,单独做求和运算.  ∑ w*x + b
            // 此处是做 ∑ w*x 运算. (是否做了归一化)
            features.foreachActive { (index, value) =>
              if (featuresStd(index) != 0.0 && value != 0.0) {
                sum += localCoefficientsArray(index) * (value / featuresStd(index))
              }
            }

            // 此处是 + b的运算.
            sum + {
              if (fitIntercept) localCoefficientsArray(dim) else 0.0
            }
          }

          // 这里的label采用的是(1,0).
          // step 2: 乘以该样本所带的unbalanced weight(样本所占比重, 缺省weight=1).
          val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)

          // step 3: 更新localGradient.
          // gradient = 1/n ∑ multiplier * x
          features.foreachActive { (index, value) =>
            if (featuresStd(index) != 0.0 && value != 0.0) {
              localGradientSumArray(index) += multiplier * (value / featuresStd(index))
            }
          }

          if (fitIntercept) {
            localGradientSumArray(dim) += multiplier
          }

          // step 4: 如果label为正例, loss为cross-entropy: 1/n * ∑ ln(1+exp(-y*wx))
          // lossSum.
          if (label > 0) {
            // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
            lossSum += weight * MLUtils.log1pExp(margin)
          } else {
            lossSum += weight * (MLUtils.log1pExp(margin) - margin)
          }
        case _ =>
          new NotImplementedError("LogisticRegression with ElasticNet in ML package " +
            "only supports binary classification for now.")
      }

      // 
      weightSum += weight
      this
    }
  }

  /**
   * 进行merge: 方便分布式计算.
   *
   * Merge another LogisticAggregator, and update the loss and gradient
   * of the objective function.
   * (Note that it's in place merging; as a result, `this` object will be modified.)
   *
   * @param other The other LogisticAggregator to be merged.
   * @return This LogisticAggregator object.
   */
  def merge(other: LogisticAggregator): this.type = {
    require(dim == other.dim, s"Dimensions mismatch when merging with another " +
      s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")

    if (other.weightSum != 0.0) {
      weightSum += other.weightSum
      lossSum += other.lossSum

      var i = 0
      val localThisGradientSumArray = this.gradientSumArray
      val localOtherGradientSumArray = other.gradientSumArray
      val len = localThisGradientSumArray.length

      // 以local为准. 每列coff所对应梯度进行叠加.
      while (i < len) {
        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
        i += 1
      }
    }
    this
  }

  // 求得最终loss.
  def loss: Double = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    lossSum / weightSum
  }

  // 求得最终gradient.
  def gradient: Vector = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    val result = Vectors.dense(gradientSumArray.clone())
    scal(1.0 / weightSum, result)
    result
  }
}

四、训练过程

  override protected def train(dataset: DataFrame): LogisticRegressionModel = {
    // 从数据集抽取列. 如果数据集是persisted,不需要persist oldDataset.  
    // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
    val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))

    // 选取labelCol, weightCol, featuresCol作为row
    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
      case Row(label: Double, weight: Double, features: Vector) =>
        Instance(label, weight, features)
    }

    // 是否持久化. 如果持久化,将训练样本进行cache. (MEMORY_AND_DISK,如果内存不够,会存成disk)
    // 这里有可能影响性能.
    val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

    // 统计.
    val (summarizer, labelSummarizer) = {
      val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        instance: Instance) =>
          (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))

      val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
          (c1._1.merge(c2._1), c1._2.merge(c2._2))

      instances.treeAggregate(
        new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
    }

    val histogram = labelSummarizer.histogram
    val numInvalid = labelSummarizer.countInvalid
    val numClasses = histogram.length
    val numFeatures = summarizer.mean.size

    if (numInvalid != 0) {
      val msg = s"Classification labels should be in {0 to ${numClasses - 1} " +
        s"Found $numInvalid invalid labels."
      logError(msg)
      throw new SparkException(msg)
    }

    if (numClasses > 2) {
      val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " +
        s"binary classification. Found $numClasses in the input dataset."
      logError(msg)
      throw new SparkException(msg)
    }

    // 特征的mean和std.
    val featuresMean = summarizer.mean.toArray
    val featuresStd = summarizer.variance.toArray.map(math.sqrt)

    // L1, L2
    val regParamL1 = $(elasticNetParam) * $(regParam)
    val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

    // costFun.
    val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization),
      featuresStd, featuresMean, regParamL2)

    // optimizer: 优化器.
    // 如果 elasticNetParam = 0, 或者 regParam=0. 即使用L2正则,或者没有正则项. => 使用LBFGS.
    // 否则,L1+L2正则,以及L1正则 => 使用OWLQN
    val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
      new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
    } else {
      def regParamL1Fun = (index: Int) => {
        // Remove the L1 penalization on the intercept
        if (index == numFeatures) {
          0.0
        } else {
          if ($(standardization)) {
            regParamL1
          } else {
            // If `standardization` is false, we still standardize the data
            // to improve the rate of convergence; as a result, we have to
            // perform this reverse standardization by penalizing each component
            // differently to get effectively the same objective function when
            // the training dataset is not standardized.
            if (featuresStd(index) != 0.0) regParamL1 / featuresStd(index) else 0.0
          }
        }
      }
      new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
    }

    // 初始化cofficients为0.
    val initialCoefficientsWithIntercept =
      Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)

    // 对于二分类,当将coefficients初始化为0时,如果根据label的分布对intercept进行初始化,收敛会更快.
    // b = log(P(1)/P(0))
    if ($(fitIntercept)) {
      /*
         For binary logistic regression, when we initialize the coefficients as zeros,
         it will converge faster if we initialize the intercept such that
         it follows the distribution of the labels.

         
         P(0) = 1 / (1 + \exp(b)), and
         P(1) = \exp(b) / (1 + \exp(b))
         , hence
         
         b = \log{P(1) / P(0)} = \log{count_1 / count_0}
         
       */
      initialCoefficientsWithIntercept.toArray(numFeatures)
        = math.log(histogram(1) / histogram(0))
    }

    // 开始迭代.
    val states = optimizer.iterations(new CachedDiffFunction(costFun),
      initialCoefficientsWithIntercept.toBreeze.toDenseVector)

    // 获取迭代的结果.
    val (coefficients, intercept, objectiveHistory) = {
      /*
         Note that in Logistic Regression, the objective history (loss + regularization)
         is log-likelihood which is invariance under feature standardization. As a result,
         the objective history from optimizer is the same as the one in the original space.
       */
      val arrayBuilder = mutable.ArrayBuilder.make[Double]
      var state: optimizer.State = null
      while (states.hasNext) {
        state = states.next()
        arrayBuilder += state.adjustedValue
      }

      if (state == null) {
        val msg = s"${optimizer.getClass.getName} failed."
        logError(msg)
        throw new SparkException(msg)
      }

      // 当权重coefficients在归一化空间中训练时,我们需要将它们转回到原始空间.
      // 注意,归一化空间的intercept,和原始空间是一样的,不需要归一化.
      /*
         The coefficients are trained in the scaled space; we're converting them back to
         the original space.
         Note that the intercept in scaled space and original space is the same;
         as a result, no scaling is needed.
       */
      val rawCoefficients = state.x.toArray.clone()
      var i = 0
      while (i < numFeatures) {
        rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
        i += 1
      }

      // 
      if ($(fitIntercept)) {
        (Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
          arrayBuilder.result())
      } else {
        (Vectors.dense(rawCoefficients).compressed, 0.0, arrayBuilder.result())
      }
    }

    if (handlePersistence) instances.unpersist()

    // 统计状态.
    val model = copyValues(new LogisticRegressionModel(uid, coefficients, intercept))
    val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
      model.transform(dataset),
      $(probabilityCol),
      $(labelCol),
      $(featuresCol),
      objectiveHistory)
    model.setSummary(logRegSummary)
  }

四、正则

  • L2 regularization -> ridge
  • L1 regularization -> lasso
  • mix L1 and L2 -> elastic Net

相应的公式:

\[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}{w_{i}}^2\] \[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}|w_i|\] \[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}(\frac{\alpha}{2}{w_i}^2+(1-\alpha)|w_i|)\]

对应到后面的代码里:

regPram = regParamL1+regParamL2
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

两种正则化方法L1和L2。L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。

LBFGS和OWLQN

ok,我们知道,模型本身基本上核心代码就落在了这两个方法上:LBFGS和OWLQN。两者都是牛顿法的变种,核心思想是:

\[\vec{w}_{n+1}=\vec{w_n}-H^{-1}\vec{G}\]

关于Hessian矩阵的计算,此处不做过多解释,如果你有兴趣想深究下数学实现。也可以再看一下breeze库里的这两个方法实现:

L-BFGS: Limited-memory BFGS。其中BFGS代表4个人的名字:broyden–fletcher–goldfarb–shanno OWL-QN: (Orthant-Wise Limited-Memory Quasi-Newton)算法。

关于breezey库,这里再简单提一下:

breeze库用于数值处理。它的目标是通用、简洁、强大,不需要牺牲太多性能。当前版本0.12。我们所熟悉的spark中的MLlib(经常见到的线性算法库:breeze.linalg、最优化算法库:breeze.optimize)就是在它的基础上构建的。另外它还提供了图绘制的功能(breeze.plot)。

总结

整个过程基本都ok了。L1还是L2的选择,看你的具体调参。另外再提一些注意事项。

  • 缺省会对特征做归一化,对于一些场景(比如推荐),离散化的特征,归一化没啥意义。反倒可能会影响结果好坏。
  • 对于截距b,会使用正负样本比例,进行log(P(1)/P(0))初始化。收敛会更快。
  • cache机制(StorageLevel.MEMORY_AND_DISK)。当内存不够用,可能会影响性能,这一点不好。

参考:

1.LogisticRegression源码 2.breeze 3.breeze文档

Netflix关于cosine相似度的讨论

Netflix团队发了篇paper《Is Cosine-Similarity of Embeddings Really About Similarity?》,对cosine相似度做了相应的研究。# 摘要余弦相似度(cosine similarity)是指两个向量间夹角的余弦...… Continue reading

Meta AdaTT介绍

Published on January 02, 2024

SATrans介绍

Published on December 02, 2023