How to assign a unique Id to the dataset row based on some column value in Spark

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



How to assign a unique Id to the dataset row based on some column value in Spark



I am trying to assign a unique Id to the row of the dataset based on some column value.
For eg consider that we have a dataset as follows:


State Country Person
MH IN ABC
AP IN XYZ
J&K IN XYZ
MH IN PQR



Now i want to assign a unique Id based on the State Column Value,if the column value repeats furhter the same Id should be populated.
Output should be as follows:


State Country Person Unique_ID
MH IN ABC 1
AP IN XYZ 2
J&K IN XYZ 3
MH IN PQR 1



How to solve this problem using Spark Java Programming.
Any help would be appreciated.





Just to be clear, are you using Java? The tags seems to suggest otherwise.
– Shaido
Aug 10 at 5:48




3 Answers
3



Here is one way of doing it with Java Spark.


package com.stackoverflow.works;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions.dense_rank;
import static org.apache.spark.sql.functions.desc;


public class UniqueIdJob

@SuppressWarnings("serial")
public static class Record implements Serializable

private String state;

private String country;

private String person;

public Record(String state, String country, String person)
super();
this.state = state;
this.country = country;
this.person = person;


public String getState()
return state;


public void setState(String state)
this.state = state;


public String getCountry()
return country;


public void setCountry(String country)
this.country = country;


public String getPerson()
return person;


public void setPerson(String person)
this.person = person;




private static Dataset<Record> createDataset(SparkSession spark)
List<Record> records = new ArrayList<Record>();
records.add(new Record("MH", "IN", "ABC"));
records.add(new Record("AP", "IN", "XYZ"));
records.add(new Record("J&K", "IN", "XYZ"));
records.add(new Record("MH", "IN", "PQR"));
records.add(new Record("AP", "IN", "XYZ1"));
records.add(new Record("AP", "IN", "XYZ2"));
Encoder<Record> recordEncoder = Encoders.bean(Record.class);
Dataset<Record> recordDataset = spark.createDataset(records,
recordEncoder);

return recordDataset;



public static void main(String args)
SparkSession spark = SparkSession.builder().appName("UniqueIdJob")
.master("local[2]").getOrCreate();
Dataset<Record> recordDataset = createDataset(spark);
WindowSpec windowSpec = org.apache.spark.sql.expressions.Window.orderBy(desc("state"));
Dataset<Row> rowDataset = recordDataset.withColumn("id", dense_rank().over(windowSpec));
rowDataset.show();
spark.stop();





It is bit slower however you can do something like follows:


select state,country,person,dense_rank() over(order by state) from ds;



This should do the job. However, windowing function without partitioning can be slower



You can define your own UDF (User Defined Function). Then you can write your own logic to create the unique id.



Here in the below example, I have created UDF to get unique id with the help of hashcode.


____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.2.0
/_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val dataDF=Seq(("MH", "IN","ABC"),("AP", "IN","XYZ"),("J&K","IN","XYZ"),("MH", "IN","PQR")).toDF("State","Country","Person")
dataDF: org.apache.spark.sql.DataFrame = [State: string, Country: string ... 1 more field]

scala> dataDF.createOrReplaceTempView("table1")

scala> def uniqueId(col:String)=col.hashCode
uniqueId: (col: String)Int

scala> spark.udf.register("uniqueid",uniqueId _)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

scala> spark.sql("select state,country,person ,uniqueid(state) as unique_id from table1").show
+-----+-------+------+---------+
|state|country|person|unique_id|
+-----+-------+------+---------+
| MH| IN| ABC| 2459|
| AP| IN| XYZ| 2095|
| J&K| IN| XYZ| 72367|
| MH| IN| PQR| 2459|
+-----+-------+------+---------+






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard