How to standardize a column in PySpark without using StandardScaler?

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



How to standardize a column in PySpark without using StandardScaler?



Seems like this should work, but I'm getting errors:


mu = mean(df[input])
sigma = stddev(df[input])
dft = df.withColumn(output, (df[input]-mu)/sigma)


pyspark.sql.utils.AnalysisException: "grouping expressions sequence is
empty, and '`user`' is not an aggregate function. Wrap
'(((CAST(`sum(response)` AS DOUBLE) - avg(`sum(response)`)) /
stddev_samp(CAST(`sum(response)` AS DOUBLE))) AS `scaled`)' in
windowing function(s) or wrap '`user`' in first() (or first_value) if
you don't care which value you get.;;nAggregate [user#0,
sum(response)#26L, ((cast(sum(response)#26L as double) -
avg(sum(response)#26L)) / stddev_samp(cast(sum(response)#26L as
double))) AS scaled#46]n+- AnalysisBarriern +- Aggregate
[user#0], [user#0, sum(cast(response#3 as bigint)) AS
sum(response)#26L]n +- Filter item_id#1 IN
(129,130,131,132,133,134,135,136,137,138)n +-
Relation[user#0,item_id#1,response_value#2,response#3,trait#4,response_timestamp#5]
csvn"



I'm not sure what's going on with this error message.





That works only for vectors.
– Evan Zamir
Aug 9 at 21:47




2 Answers
2



Using collect() is not a good solution in general and you will see that this will not scale as your data grows.


collect()



If you don't want to use StandardScaler, a better way is to use a Window to compute the mean and standard deviation.


StandardScaler


Window



Borrowing the same example from StandardScaler in Spark not working as expected:


from pyspark.sql.functions import col, mean, stddev
from pyspark.sql import Window

df = spark.createDataFrame(
np.array(range(1,10,1)).reshape(3,3).tolist(),
["int1", "int2", "int3"]
)
df.show()
#+----+----+----+
#|int1|int2|int3|
#+----+----+----+
#| 1| 2| 3|
#| 4| 5| 6|
#| 7| 8| 9|
#+----+----+----+



Suppose you wanted to standardize the column int2:


int2


input_col = "int2"
output_col = "int2_scaled"

w = Window.partitionBy()

mu = mean(input_col).over(w)
sigma = stddev(input_col).over(w)

df.withColumn(output_col, (col(input_col) - mu)/(sigma)).show()
#+----+----+----+-----------+
#|int1|int2|int3|int2_scaled|
#+----+----+----+-----------+
#| 1| 2| 3| -1.0|
#| 7| 8| 9| 1.0|
#| 4| 5| 6| 0.0|
#+----+----+----+-----------+



If you wanted to use the population standard deviation as in the other example, replace pyspark.sql.functions.stddev with pyspark.sql.functions.stddev_pop().


pyspark.sql.functions.stddev


pyspark.sql.functions.stddev_pop()





Good answer! Thanks.
– Evan Zamir
Aug 10 at 17:22




Fortunately, I was able to find code that works:


summary = df.select([mean(input).alias('mu'), stddev(input).alias('sigma')])
.collect().pop()
dft = df.withColumn(output, (df[input]-summary.mu)/summary.sigma)





@pault Feel free to contribute an answer you think is better.
– Evan Zamir
Aug 9 at 22:42






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard