Analyse Weather input Dataset and apply Aggregation RDD functions

#Input weather dataset

fileRdd = sc.sparkContext.textFile(‘weather.csv’)
fileRdd.collect()

Output:

['2016-05-09,234893,34', '2019-09-08,234896,3', '2019-11-19,234895,24', '2017-04-04,234900,43', '2013-12-04,234900,47', '2019-08-28,234894,5', '2013-11-29,234897,-5', '2018-08-19,234895,40', '2019-06-06,234890,9', '2017-02-09,234900,21', '2017-03-30,234893,36', '2019-01-01,234895,-9', '2010-12-23,234898,-1', '2015-09-03,234890,13', '2011-07-19,234898,25', '2014-09-27,234897,29', '2013-11-18,234891,-9', '2012-02-09,234893,10', '2014-07-03,234897,29', '2011-11-07,234895,38', '2014-02-14,234891,24', '2012-02-18,234893,5', '2010-01-31,234896,-8', '2015-08-19,234890,-7', '2017-03-26,234891,-1', '2011-04-23,234894,23', '2014-09-15,234898,-8', '2011-06-16,234890,33', '2018-04-15,234899,22', '2019-04-18,234896,40', '2018-06-17,234895,45', '2010-03-08,234900,36', '2014-03-22,234892,15', '2019-07-10,234893,-3', '2019-06-16,234896,39', '2017-08-24,234899,-5', '2011-01-23,234890,10', '2011-12-14,234898,35', '2010-05-11,234894,39', '2013-10-24,234894,9', '2018-06-10,234894,16', '2014-01-27,234892,7', '2014-03-01,234893,15', '2018-02-18,234898,6', '2015-12-02,234891,1', '2017-03-03,234896,9', '2013-04-19,234895,-8', '2016-02-03,234900,8', '2017-04-09,234892,-10', '2011-02-10,234890,-10', '2011-04-12,234892,-1', '2013-02-06,234896,15', '2018-10-14,234897,4', '2014-02-01,234898,6', '2018-05-07,234891,31', '2019-04-04,234897,39', '2015-11-30,234898,29', '2017-02-26,234892,31', '2013-11-10,234899,19', '2010-02-09,234893,19', '2010-12-04,234894,-5', '2016-03-07,234900,-2', '2013-01-16,234898,-5', '2015-12-06,234893,41', '2010-07-02,234895,11', '2015-07-23,234894,0', '2013-08-21,234892,12', '2015-01-10,234895,15', '2010-06-27,234898,19', '2013-08-19,234900,29', '2018-06-11,234898,12', '2019-01-12,234891,-4', '2015-09-04,234896,1', '2018-07-04,234897,8', '2014-06-26,234900,1', '2012-02-22,234894,40', '2013-03-27,234890,41', '2013-08-16,234899,3', '2013-02-05,234895,15', '2018-07-11,234899,7', '2010-03-30,234892,11', '2010-12-27,234899,1', '2011-09-05,234894,27', '2019-11-19,234898,47', '2014-06-14,234895,9', '2015-10-10,234895,1', '2018-08-07,234899,44', '2013-04-14,234893,16', '2018-08-05,234892,14', '2011-01-29,234898,17', '2013-10-08,234892,40', '2014-01-10,234895,6', '2010-12-20,234890,13', '2011-06-04,234893,-9', '2014-09-19,234895,35', '2017-09-22,234897,47', '2012-07-10,234892,-8', '2016-08-07,234894,36', '2018-10-30,234897,45', '2010-11-16,234896,15', '2020-11-16,234896,45', '2020-11-16,234896,15']

#Using groupby
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],line.split(‘,’)[2])).\
groupByKey().mapValues(max)
fileRdd.collect()

Output:

[('2016', '8'), ('2019', '9'), ('2017', '9'), ('2013', '9'), ('2010', '39'), ('2015', '41'), ('2011', '38'), ('2014', '9'), ('2012', '5'), ('2020', '45'), ('2018', '8')]

#Using combineByKey all keywise sum
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
combineByKey(
lambda value: (value,1),
lambda x, value: (x[0]+value, x[1]+1),
lambda x, y: (x[0]+y[0] , x[1]+y[1])
)
fileRdd.collect()

Output:

[('2016', (76, 4)), ('2019', (190, 11)), ('2017', (171, 9)), ('2013', (219, 15)), ('2010', (150, 12)), ('2015', (94, 9)), ('2011', (188, 11)), ('2014', (168, 12)), ('2012', (47, 4)), ('2020', (60, 2)), ('2018', (294, 13))]

#Using combineByKey all keywise list of records
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: [a,b]
)
fileRdd.collect()

Output:

[('2016', [[34, 8], [-2, 36]]), ('2019', [[[[[[[[3, 24], 5], 9], -9], 40], -3], 39], [[39, -4], 47]]), ('2017', [[[[[[[43, 21], 36], -1], -5], 9], -10], [31, 47]]), ('2013',  [[[[[47, -5], -9], 9], -8],   [[[[[[[[[15, 19], -5], 12], 29], 41], 3], 15], 16], 40]]), ('2010', [[[[-1, -8], 36], 39], [[[[[[[19, -5], 11], 19], 11], 1], 13], 15]]), ('2015', [[[13, -7], 1], [[[[[29, 41], 0], 15], 1], 1]]), ('2011', [[[[[[[[25, 38], 23], 33], 10], 35], -10], -1], [[27, 17], -9]]), ('2014', [[[[[[[29, 29], 24], -8], 15], 7], 15], [[[[6, 1], 9], 6], 35]]), ('2012', [[10, 5], [40, -8]]), ('2020', [45, 15]), ('2018',  [[[[[40, 22], 45], 16], 6], [[[[[[[4, 31], 12], 8], 7], 44], 14], 45]])]

#Using reducebykey finding max for keys
fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: max(a,b)
)
fileRdd.collect()

Output:

[('2016', 36), ('2019', 47), ('2017', 47), ('2013', 47), ('2010', 39), ('2015', 41), ('2011', 38), ('2014', 35), ('2012', 40), ('2020', 45), ('2018', 45)]

#Using reducebykey finding max using UDF
def decision(a,b):
if a > b:
return a
else:
return b

fileRdd = sc.sparkContext.textFile(‘weather.csv’).\
map(lambda line : (line.split(‘,’)[0].split(‘-‘)[0],int(line.split(‘,’)[2]))).\
reduceByKey(
lambda a,b: decision(a,b)
)
fileRdd.collect()

Output:

[('2016', 36), ('2019', 47), ('2017', 47), ('2013', 47), ('2010', 39), ('2015', 41), ('2011', 38), ('2014', 35), ('2012', 40), ('2020', 45), ('2018', 45)]

 

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)