Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset

fileRdd = sc.sparkContext.textFile(‘weather.csv’)
fileRdd.collect()

Output:

['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '2018-04-15,234899,22', '2019-04-18,234896,40', '2018-06-17,234895,45', '2010-03-08,234900,36', '2014-03-22,234892,15', '2019-07-10,234893,-3', '2019-06-16,234896,39', '2017-08-24,234899,-5', '2011-01-23,234890,10', '2011-12-14,234898,35', '2010-05-11,234894,39', '2013-10-24,234894,9', '2018-06-10,234894,16', '2014-01-27,234892,7', '2014-03-01,234893,15', '2018-02-18,234898,6', '2015-12-02,234891,1', '2017-03-03,234896,9', '2013-04-19,234895,-8', '2016-02-03,234900,8', '2017-04-09,234892,-10', '2011-02-10,234890,-10', '2011-04-12,234892,-1', '2013-02-06,234896,15', '2018-10-14,234897,4', '2014-02-01,234898,6', '2018-05-07,234891,31', '2019-04-04,234897,39', '2015-11-30,234898,29', '2017-02-26,234892,31', '2013-11-10,234899,19', '2010-02-09,234893,19', '2010-12-04,234894,-5', '2016-03-07,234900,-2', '2013-01-16,234898,-5', '2015-12-06,234893,41', '2010-07-02,234895,11', '2015-07-23,234894,0', '2013-08-21,234892,12', '2015-01-10,234895,15', '2010-06-27,234898,19', '2013-08-19,234900,29', '2018-06-11,234898,12', '2019-01-12,234891,-4', '2015-09-04,234896,1', '2018-07-04,234897,8', '2014-06-26,234900,1', '2012-02-22,234894,40', '2013-03-27,234890,41', '2013-08-16,234899,3', '2013-02-05,234895,15', '2018-07-11,234899,7', '2010-03-30,234892,11', '2010-12-27,234899,1', '2011-09-05,234894,27', '2019-11-19,234898,47', '2014-06-14,234895,9', '2015-10-10,234895,1', '2018-08-07,234899,44', '2013-04-14,234893,16', '2018-08-05,234892,14', '2011-01-29,234898,17', '2013-10-08,234892,40', '2014-01-10,234895,6', '2010-12-20,234890,13', '2011-06-04,234893,-9', '2014-09-19,234895,35', '2017-09-22,234897,47', '2012-07-10,234892,-8', '2016-08-07,234894,36', '2018-10-30,234897,45', '2010-11-16,234896,15', '2020-11-16,234896,45', '2020-11-16,234896,15']

#Using groupby
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],line.split(‘,’)[2])).\
groupByKey().mapValues(max)
fileRdd.collect()

Output:

[('2016', '8'), ('2019', '9'), ('2017', '9'), ('2013', '9'), ('2010', '39'), ('2015', '41'), ('2011', '38'), ('2014', '9'), ('2012', '5'), ('2020', '45'), ('2018', '8')]

#Using combineByKey all keywise sum
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
combineByKey(
lambda value: (value,1),
lambda x, value: (x[0]+value, x[1]+1),
lambda x, y: (x[0]+y[0] , x[1]+y[1])
)
fileRdd.collect()

Output:

[('2016', (76, 4)), ('2019', (190, 11)), ('2017', (171, 9)), ('2013', (219, 15)), ('2010', (150, 12)), ('2015', (94, 9)), ('2011', (188, 11)), ('2014', (168, 12)), ('2012', (47, 4)), ('2020', (60, 2)), ('2018', (294, 13))]

#Using combineByKey all keywise list of records
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: [a,b]
)
fileRdd.collect()

Output:

[('2016', [[34, 8], [-2, 36]]), ('2019', [[[[[[[[3, 24], 5], 9], -9], 40], -3], 39], [[39, -4], 47]]), ('2017', [[[[[[[43, 21], 36], -1], -5], 9], -10], [31, 47]]), ('2013',  [[[[[47, -5], -9], 9], -8],   [[[[[[[[[15, 19], -5], 12], 29], 41], 3], 15], 16], 40]]), ('2010', [[[[-1, -8], 36], 39], [[[[[[[19, -5], 11], 19], 11], 1], 13], 15]]), ('2015', [[[13, -7], 1], [[[[[29, 41], 0], 15], 1], 1]]), ('2011', [[[[[[[[25, 38], 23], 33], 10], 35], -10], -1], [[27, 17], -9]]), ('2014', [[[[[[[29, 29], 24], -8], 15], 7], 15], [[[[6, 1], 9], 6], 35]]), ('2012', [[10, 5], [40, -8]]), ('2020', [45, 15]), ('2018',  [[[[[40, 22], 45], 16], 6], [[[[[[[4, 31], 12], 8], 7], 44], 14], 45]])]

#Using reducebykey finding max for keys
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: max(a,b)
)
fileRdd.collect()

Output:

[('2016', 36), ('2019', 47), ('2017', 47), ('2013', 47), ('2010', 39), ('2015', 41), ('2011', 38), ('2014', 35), ('2012', 40), ('2020', 45), ('2018', 45)]

#Using reducebykey finding max using UDF
def decision(a,b):
if a > b:
return a
else:
return b

fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: decision(a,b)
)
fileRdd.collect()

Output:

[('2016', 36), ('2019', 47), ('2017', 47), ('2013', 47), ('2010', 39), ('2015', 41), ('2011', 38), ('2014', 35), ('2012', 40), ('2020', 45), ('2018', 45)]

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)