How to standardize a column in PySpark without using StandardScaler?
Clash Royale CLAN TAG#URR8PPP
How to standardize a column in PySpark without using StandardScaler?
Seems like this should work, but I'm getting errors:
mu = mean(df[input])
sigma = stddev(df[input])
dft = df.withColumn(output, (df[input]-mu)/sigma)
pyspark.sql.utils.AnalysisException: "grouping expressions sequence is
empty, and '`user`' is not an aggregate function. Wrap
'(((CAST(`sum(response)` AS DOUBLE) - avg(`sum(response)`)) /
stddev_samp(CAST(`sum(response)` AS DOUBLE))) AS `scaled`)' in
windowing function(s) or wrap '`user`' in first() (or first_value) if
you don't care which value you get.;;nAggregate [user#0,
sum(response)#26L, ((cast(sum(response)#26L as double) -
avg(sum(response)#26L)) / stddev_samp(cast(sum(response)#26L as
double))) AS scaled#46]n+- AnalysisBarriern +- Aggregate
[user#0], [user#0, sum(cast(response#3 as bigint)) AS
sum(response)#26L]n +- Filter item_id#1 IN
(129,130,131,132,133,134,135,136,137,138)n +-
Relation[user#0,item_id#1,response_value#2,response#3,trait#4,response_timestamp#5]
csvn"
I'm not sure what's going on with this error message.
2 Answers
2
Using collect()
is not a good solution in general and you will see that this will not scale as your data grows.
collect()
If you don't want to use StandardScaler
, a better way is to use a Window
to compute the mean and standard deviation.
StandardScaler
Window
Borrowing the same example from StandardScaler in Spark not working as expected:
from pyspark.sql.functions import col, mean, stddev
from pyspark.sql import Window
df = spark.createDataFrame(
np.array(range(1,10,1)).reshape(3,3).tolist(),
["int1", "int2", "int3"]
)
df.show()
#+----+----+----+
#|int1|int2|int3|
#+----+----+----+
#| 1| 2| 3|
#| 4| 5| 6|
#| 7| 8| 9|
#+----+----+----+
Suppose you wanted to standardize the column int2
:
int2
input_col = "int2"
output_col = "int2_scaled"
w = Window.partitionBy()
mu = mean(input_col).over(w)
sigma = stddev(input_col).over(w)
df.withColumn(output_col, (col(input_col) - mu)/(sigma)).show()
#+----+----+----+-----------+
#|int1|int2|int3|int2_scaled|
#+----+----+----+-----------+
#| 1| 2| 3| -1.0|
#| 7| 8| 9| 1.0|
#| 4| 5| 6| 0.0|
#+----+----+----+-----------+
If you wanted to use the population standard deviation as in the other example, replace pyspark.sql.functions.stddev
with pyspark.sql.functions.stddev_pop()
.
pyspark.sql.functions.stddev
pyspark.sql.functions.stddev_pop()
Good answer! Thanks.
– Evan Zamir
Aug 10 at 17:22
Fortunately, I was able to find code that works:
summary = df.select([mean(input).alias('mu'), stddev(input).alias('sigma')])
.collect().pop()
dft = df.withColumn(output, (df[input]-summary.mu)/summary.sigma)
@pault Feel free to contribute an answer you think is better.
– Evan Zamir
Aug 9 at 22:42
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
That works only for vectors.
– Evan Zamir
Aug 9 at 21:47