Saturday, 17 February 2018

Spark: Accessing the next row

Sometimes when you are processing log files with the Spark, you need to have some data fields of the next (or previous) row in hand. For example, you have data file containing some credit card usage with the following structure:

timestamp, creditCardId, merchantId, amount, ...

Now you want to calculate some velocity measure for every single user, like the minimum time each user usually uses to change the merchant. To solve this problem, we need to have access to each card's transaction and the next one, ordered by time.

OK, first you need to load the data file into a dataset, let us use java syntax to describe the solution:

Dataset dataset = sparkSession.read().parquet("your_data_file"); dataset.createOrReplaceTempView("ds");

Then you need to add the row number to the dataset; we have to build the row number with the order of creditCardId, timestamp as below:

String sql = "select row_number() over (order by creditCardId, timestamp) as rowNum,";
sql += "timestamp, creditCardId, merchantId, amount, ... from ds";
dataset = spark.sql(sql).toDF();

Now the dataset contains unique row numbers ordered by creditCardId and timestamp. What we need to do is joining the dataset with itself using row number like this:

dataset.createOrReplaceTempView("ds1");
dataset.createOrReplaceTempView("ds2");
sql = "select ds1.creditCardId as creditCardId, ";
sql += "ds1.timestamp as timestamp1, ds2.timestamp as timestamp2, ";
sql += "ds1.merchantId as merchantId1, ds2.merchantId as merchantId2";
sql += "ds1.amount as amount1, ds2.amount as amount2, ";
...
sql += "from ds1 join ds2 on ds1.rowNum+1 = ds2.rowNum, ds1.creditCardId = ds2.creditCardId"
dataset = spark.sql(sql).toDF();

At this point each row of the dataset contains the next row's timestamp, merchandId and amount, and you can do any calculation you want.

No comments:

Post a Comment