介绍
MLlib中有两个lr实现。分别在mllib和ml中。此处分析的是ml,这也是后续mllib继续重点发展的一个库, mllib则会渐渐淡出历史舞台。ml的logistic回归,代码在org.apache.spark.ml.classification.LogisticRegression中实现。在详见代码之前,我们先简单地看下LR可调节的参数. [暂时修正至以1.6.1代码为参考.]
一、LR模型的参数
参数:
- threshold: 如果label 1的概率>threshold,则预测为1,否则为0.
- regParam:正则项参数(相当于 $ \lambda $)
- elasticNetParam:ElasticNet混合参数 $ \alpha $ 。如果$ \alpha = 0 $, 惩罚项是一个L2 penalty。如果$ \alpha = 1 $,它是一个L1 penalty。如果$ 0 < \alpha < 1 $,则是L1和L2的结果。缺省为0.0,为L2罚项。
- maxIter: 缺省100次。
- tol:迭代收敛的容忍度。值越小,会得到更高精度,同时迭代次数开销也大。缺省为1E-6.
- fitIntercept: 是否要拟合截距项(intercept term),缺省为true.
- standardization:在对模型进行fit前,是否对训练特征进行标准化(归一化)。模型的系数将总是返回到原比例(origin scale)上,它对用户是透明的。注意:当不使用正则化时,使用/不使用standardization,模型都应收敛到相同的解(solution)上。在R的GLMNET包里,缺省行为也为true。缺省为true。
- weightCol:如果没设置或空,所有实例为有为1的权重。如果设置,则相当于对unbalanced data设置不同的权重。缺省不设置。
- treeAggregate:如果特征维度或分区数太大,该参数需要调得更大。缺省为2.
二、原理
logistic回归的原理,这里不详述。简单地写:y = logit(∑wx + b)。相应的loss和梯度如下所示(箭头表示向量):
\[l_{total}(\vec{w}, \vec{x})=l_{model}(\vec{w}, \vec{x})+l_{reg}(\vec{w})\] \[\vec{G} (\vec{w}, \vec{x})_{total}= \vec{G} (\vec{w}, \vec{x})_{model} + \vec{G} (\vec{w})_{reg}\]$l_{model}$为cross-entropy;$l_{reg}$为正则项。
对于第一部分$l_{model}$的计算,依赖于训练数据;对于第二部分正则项,它不依赖于训练数据。因而这两部分在实现时是独立计算的。
每个训练样本的loss和gradient是独立的。
样本i的梯度:
\[(\vec{G}(\vec{w}, \vec{x})_{model})_i= G_i(\vec{w}, \vec{x})= \frac{\partial{l(\vec{w}, \vec{x})}}{\partial{w_i}}=\sum_{k=1}^{N}y_{k} x_{ki} - \frac{exp(\vec{x}_{k} \vec{w})}{1+exp(\vec{x}_k,\vec{w})}x_{ki}\]样本的loss:
\[l(\vec{w}, \vec{x})_{model}=-\sum_{k=1}^{N}y_{k} \vec{x}_{k} \vec{w} - log(1+exp(\vec{x}_k \vec{w}))\]对于spark来说,ml的logistic回归实现通过treeAggregate来完成。在executors/workers上计算各RDD上的loss和gradient;在driver/controller上将它们进行reduce进行所有训练样本的求和,得到lossSum和gradientSum。
在单机的driver上完成Optimization; L1正则由OWLQN optimizer处理。L2由L-BFGS处理。这两个优化器都由Breeze库提供(Breeze库提供了Vector/Matrix的实现以及相应计算的接口Linalg)。
整个过程如图所示:
三、loss与gradient
主要在LogisticCostFun类中,具体见代码,这里仅把核心代码及注释帖出来。可以对应于上面的公式。
private class LogisticCostFun(
instances: RDD[Instance],
numClasses: Int,
fitIntercept: Boolean,
standardization: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
regParamL2: Double) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val numFeatures = featuresStd.length
val coeffs = Vectors.fromBreeze(coefficients)
// step 1: cost部分.
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
// 先在rdd的每个分区上应用seqOp函数,做add操作(计算每个样本的loss/gradient)
// 再在driver上应用comOp函数,做merge操作(求得总的loss/gradient)
instances.treeAggregate(
new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
)(seqOp, combOp)
}
// 求得总的gradientArray.
val totalGradientArray = logisticAggregator.gradient.toArray
// step 2: L2正则项部分 => regVal.
// reg = lambda * ∑ regParamL2/2 wi^2 + (1-regParamL2) |wi|
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
coeffs.foreachActive { (index, value) =>
// If `fitIntercept` is true, the last term which is intercept doesn't
// contribute to the regularization.
if (index != numFeatures) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
// gradientArray: costFun项的梯度 + 正则项的梯度
// sum: L2正则
sum += {
if (standardization) {
totalGradientArray(index) += regParamL2 * value
value * value
} else {
//
if (featuresStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(index) * featuresStd(index))
totalGradientArray(index) += regParamL2 * temp
value * temp
} else {
0.0
}
}
}
}
}
0.5 * regParamL2 * sum
}
// 返回带L2正则的loss,以及带L2正则的梯度.
(logisticAggregator.loss + regVal, new BDV(totalGradientArray))
}
}
里面会涉及到另一个类:LogisticAggregator。它的作用,相当于做map/reduce运算,计算loss/gradient。
private class LogisticAggregator(
coefficients: Vector,
numClasses: Int,
fitIntercept: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double]) extends Serializable {
private var weightSum = 0.0
private var lossSum = 0.0
// coefficients => coefficientsArray 数组.
private val coefficientsArray = coefficients match {
case dv: DenseVector => dv.values
case _ =>
throw new IllegalArgumentException(
s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
}
// 总的维度.
private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length
//gradientSumArray.
private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)
/**
* 将一个训练样本添加到LogisticAggregator, 并更新目标函数的loss和gradient.
*
* Add a new training instance to this LogisticAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This LogisticAggregator object.
*/
def add(instance: Instance): this.type = {
instance match { case Instance(label, weight, features) =>
require(dim == features.size, s"Dimensions mismatch when adding new instance." +
s" Expecting $dim but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
val localCoefficientsArray = coefficientsArray
val localGradientSumArray = gradientSumArray
numClasses match {
case 2 =>
// For Binary Logistic Regression.
// step 1: 二分类,求得 z = ∑ w*x + b,取负号.
val margin = - {
var sum = 0.0
// a.每个特征号上,单独做求和运算. ∑ w*x + b
// 此处是做 ∑ w*x 运算. (是否做了归一化)
features.foreachActive { (index, value) =>
if (featuresStd(index) != 0.0 && value != 0.0) {
sum += localCoefficientsArray(index) * (value / featuresStd(index))
}
}
// 此处是 + b的运算.
sum + {
if (fitIntercept) localCoefficientsArray(dim) else 0.0
}
}
// 这里的label采用的是(1,0).
// step 2: 乘以该样本所带的unbalanced weight(样本所占比重, 缺省weight=1).
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
// step 3: 更新localGradient.
// gradient = 1/n ∑ multiplier * x
features.foreachActive { (index, value) =>
if (featuresStd(index) != 0.0 && value != 0.0) {
localGradientSumArray(index) += multiplier * (value / featuresStd(index))
}
}
if (fitIntercept) {
localGradientSumArray(dim) += multiplier
}
// step 4: 如果label为正例, loss为cross-entropy: 1/n * ∑ ln(1+exp(-y*wx))
// lossSum.
if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * MLUtils.log1pExp(margin)
} else {
lossSum += weight * (MLUtils.log1pExp(margin) - margin)
}
case _ =>
new NotImplementedError("LogisticRegression with ElasticNet in ML package " +
"only supports binary classification for now.")
}
//
weightSum += weight
this
}
}
/**
* 进行merge: 方便分布式计算.
*
* Merge another LogisticAggregator, and update the loss and gradient
* of the objective function.
* (Note that it's in place merging; as a result, `this` object will be modified.)
*
* @param other The other LogisticAggregator to be merged.
* @return This LogisticAggregator object.
*/
def merge(other: LogisticAggregator): this.type = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
if (other.weightSum != 0.0) {
weightSum += other.weightSum
lossSum += other.lossSum
var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
val len = localThisGradientSumArray.length
// 以local为准. 每列coff所对应梯度进行叠加.
while (i < len) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
}
this
}
// 求得最终loss.
def loss: Double = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but $weightSum.")
lossSum / weightSum
}
// 求得最终gradient.
def gradient: Vector = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
result
}
}
四、训练过程
override protected def train(dataset: DataFrame): LogisticRegressionModel = {
// 从数据集抽取列. 如果数据集是persisted,不需要persist oldDataset.
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
// 选取labelCol, weightCol, featuresCol作为row
val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}
// 是否持久化. 如果持久化,将训练样本进行cache. (MEMORY_AND_DISK,如果内存不够,会存成disk)
// 这里有可能影响性能.
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
// 统计.
val (summarizer, labelSummarizer) = {
val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))
val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
}
val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
val numClasses = histogram.length
val numFeatures = summarizer.mean.size
if (numInvalid != 0) {
val msg = s"Classification labels should be in {0 to ${numClasses - 1} " +
s"Found $numInvalid invalid labels."
logError(msg)
throw new SparkException(msg)
}
if (numClasses > 2) {
val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " +
s"binary classification. Found $numClasses in the input dataset."
logError(msg)
throw new SparkException(msg)
}
// 特征的mean和std.
val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.variance.toArray.map(math.sqrt)
// L1, L2
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
// costFun.
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization),
featuresStd, featuresMean, regParamL2)
// optimizer: 优化器.
// 如果 elasticNetParam = 0, 或者 regParam=0. 即使用L2正则,或者没有正则项. => 使用LBFGS.
// 否则,L1+L2正则,以及L1正则 => 使用OWLQN
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
} else {
def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept
if (index == numFeatures) {
0.0
} else {
if ($(standardization)) {
regParamL1
} else {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
if (featuresStd(index) != 0.0) regParamL1 / featuresStd(index) else 0.0
}
}
}
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}
// 初始化cofficients为0.
val initialCoefficientsWithIntercept =
Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)
// 对于二分类,当将coefficients初始化为0时,如果根据label的分布对intercept进行初始化,收敛会更快.
// b = log(P(1)/P(0))
if ($(fitIntercept)) {
/*
For binary logistic regression, when we initialize the coefficients as zeros,
it will converge faster if we initialize the intercept such that
it follows the distribution of the labels.
P(0) = 1 / (1 + \exp(b)), and
P(1) = \exp(b) / (1 + \exp(b))
, hence
b = \log{P(1) / P(0)} = \log{count_1 / count_0}
*/
initialCoefficientsWithIntercept.toArray(numFeatures)
= math.log(histogram(1) / histogram(0))
}
// 开始迭代.
val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialCoefficientsWithIntercept.toBreeze.toDenseVector)
// 获取迭代的结果.
val (coefficients, intercept, objectiveHistory) = {
/*
Note that in Logistic Regression, the objective history (loss + regularization)
is log-likelihood which is invariance under feature standardization. As a result,
the objective history from optimizer is the same as the one in the original space.
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
logError(msg)
throw new SparkException(msg)
}
// 当权重coefficients在归一化空间中训练时,我们需要将它们转回到原始空间.
// 注意,归一化空间的intercept,和原始空间是一样的,不需要归一化.
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray.clone()
var i = 0
while (i < numFeatures) {
rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
i += 1
}
//
if ($(fitIntercept)) {
(Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
arrayBuilder.result())
} else {
(Vectors.dense(rawCoefficients).compressed, 0.0, arrayBuilder.result())
}
}
if (handlePersistence) instances.unpersist()
// 统计状态.
val model = copyValues(new LogisticRegressionModel(uid, coefficients, intercept))
val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
model.transform(dataset),
$(probabilityCol),
$(labelCol),
$(featuresCol),
objectiveHistory)
model.setSummary(logRegSummary)
}
四、正则
- L2 regularization -> ridge
- L1 regularization -> lasso
- mix L1 and L2 -> elastic Net
相应的公式:
\[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}{w_{i}}^2\] \[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}|w_i|\] \[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}(\frac{\alpha}{2}{w_i}^2+(1-\alpha)|w_i|)\]对应到后面的代码里:
regPram = regParamL1+regParamL2
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
两种正则化方法L1和L2。L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。
LBFGS和OWLQN
ok,我们知道,模型本身基本上核心代码就落在了这两个方法上:LBFGS和OWLQN。两者都是牛顿法的变种,核心思想是:
\[\vec{w}_{n+1}=\vec{w_n}-H^{-1}\vec{G}\]关于Hessian矩阵的计算,此处不做过多解释,如果你有兴趣想深究下数学实现。也可以再看一下breeze库里的这两个方法实现:
L-BFGS: Limited-memory BFGS。其中BFGS代表4个人的名字:broyden–fletcher–goldfarb–shanno OWL-QN: (Orthant-Wise Limited-Memory Quasi-Newton)算法。
关于breezey库,这里再简单提一下:
breeze库用于数值处理。它的目标是通用、简洁、强大,不需要牺牲太多性能。当前版本0.12。我们所熟悉的spark中的MLlib(经常见到的线性算法库:breeze.linalg、最优化算法库:breeze.optimize)就是在它的基础上构建的。另外它还提供了图绘制的功能(breeze.plot)。
总结
整个过程基本都ok了。L1还是L2的选择,看你的具体调参。另外再提一些注意事项。
- 缺省会对特征做归一化,对于一些场景(比如推荐),离散化的特征,归一化没啥意义。反倒可能会影响结果好坏。
- 对于截距b,会使用正负样本比例,进行log(P(1)/P(0))初始化。收敛会更快。
- cache机制(StorageLevel.MEMORY_AND_DISK)。当内存不够用,可能会影响性能,这一点不好。
参考: