Spark

How to get latest record in Spark Dataframe

how-to-get-latest-record-in-spark-dataframe-24tutorials
scala> val inputDF = sc.parallelize(Seq((1,"oclay",400,"2015-01-01 00:00:00"),(1,"oclay",800,"2018-01-01 00:00:00"))).toDF("pid","pname","price","last_mod")
scala> inputDF.show
+---+-----+-----+-------------------+
|pid|pname|price|           last_mod|
+---+-----+-----+-------------------+
|  1|oclay|  400|2015-01-01 00:00:00|
|  1|oclay|  800|2018-01-01 00:00:00|
+---+-----+-----+-------------------+
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def getLatestRecs(df: DataFrame, partition_col: List[String], sortCols: List[String]): DataFrame = {
val part = Window.partitionBy(partition_col.head,partition_col:_*).orderBy(array(sortCols.head,sortCols:_*).desc)
val rowDF = df.withColumn("rn", row_number().over(part))
val res = rowDF.filter("rn==1").drop("rn")
res
}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
getLatestRecs: (df: org.apache.spark.sql.DataFrame, partition_col: List[String], sortCols: List[String])org.apache.spark.sql.DataFrame
scala> val result = getLatestRecs(inputDF,List("pid","pname"),List("last_mod"))
result: org.apache.spark.sql.DataFrame = [pid: int, pname: string, price: int, last_mod: string]
scala> result.show
+---+-----+-----+-------------------+
|pid|pname|price|           last_mod|
+---+-----+-----+-------------------+
|  1|oclay|  800|2018-01-01 00:00:00|
+---+-----+-----+-------------------+

Share This Post

Avatar
An Ambivert, music lover, enthusiast, artist, designer, coder, gamer, content writer. He is Professional Software Developer with hands-on experience in Spark, Kafka, Scala, Python, Hadoop, Hive, Sqoop, Pig, php, html,css. Know more about him at www.saikumar.me

Lost Password

Register