>>> emps = sc.parallelize([(1,'Avc Ser',2300,'L1'),(2,'Kghf Jfjs',3400,'L1'),(3,'Jgf Fjf',2455,'L2'),(4,'Ejh Ruyg',3500,'L3'),(5,'Efgw Ygefrf',5400,'L4'),(6,'Fuyg Ruyoi',4020,'L4'),(7,'Gfgd Tefdf',3500,'L4')])
>>> locs = sc.parallelize([('L1','Accounts','New York'),('L2','Sales','London'),('L3','Marketing','Dubai'),('L4','RND','Mumbai')])
>>> emps.collect()
[(1, 'Avc Ser', 2300, 'L1'), (2, 'Kghf Jfjs', 3400, 'L1'), (3, 'Jgf Fjf', 2455, 'L2'), (4, 'Ejh Ruyg', 3500, 'L3'), (5, 'Efgw Ygefrf', 5400, 'L4'), (6, 'Fuyg Ruyoi', 4020, 'L4'), (7, 'Gfgd Tefdf', 3500, 'L4')]
>>> locs.collect()
[('L1', 'Accounts', 'New York'), ('L2', 'Sales', 'London'), ('L3', 'Marketing', 'Dubai'), ('L4', 'RND', 'Mumbai')]
We can not join employee rdd and location rdd directly as the keys(first elements) of both are different.
Location field has to be at first, so we need to make new rdd with location values at first
emps - (1, 'Avc Ser', 2300, 'L1'), new rdd - ('L1', 1, 'Avc Ser', 2300)
>>> emplocs = emps.map(lambda e: (e[3],e[0],e[1],e[2]))
>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500)]
>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
Count of employees of every location
>>> emplocs.join(locs).groupByKey().mapValues(len).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]
Grouping with sort - default is ascending order
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey().collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey(True).collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]
Descending order
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey(False).collect()
[('L4', 3), ('L3', 1), ('L2', 1), ('L1', 2)]
Sort by value of K,V paired RDD
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: r[1]).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]
Reverse sort by value of K,V paried RDD
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: r[1],False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
Sort by mutliple values
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[0],r[1]),False).collect()
[('L4', 3), ('L3', 1), ('L2', 1), ('L1', 2)]
>>> emplocs.join(locs).groupByKey().mapValues(len).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).groupByKey().collect()
[('L2', ), ('L3', ), ('L1', ), ('L4', )]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey().collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: l[1]).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: -l[1]).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0])).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0])).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0]),False).collect()
[('L3', 1), ('L2', 1), ('L1', 2), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),True).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],-ord(l[0][0])),False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],-ord(l[0][0])),False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]
Joins with null values
>>> emps = sc.parallelize([(1,'Avc Ser',2300,'L1'),(2,'Kghf Jfjs',3400,'L1'),(3,'Jgf Fjf',2455,'L2'),(4,'Ejh Ruyg',3500,'L3'),(5,'Efgw Ygefrf',5400,'L4'),(6,'Fuyg Ruyoi',4020,'L4'),(7,'Gfgd Tefdf',3500,'L4'),(8,'Herkj Gdf',2354,'')])
>>> locs = sc.parallelize([('L1','Accounts','New York'),('L2','Sales','London'),('L3','Marketing','Dubai'),('L4','RND','Mumbai'),('L5','Delivery','Singapore')])
>>> emplocs = emps.map(lambda e: (e[3],e[0],e[1],e[2]))
>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500), ('', 8, 'Herkj Gdf', 2354)]
>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
>>> emplocs.join(locs).map(list).collect()
[['L2', (3, 'Sales')], ['L3', (4, 'Marketing')], ['L1', (1, 'Accounts')], ['L1', (2, 'Accounts')], ['L4', (5, 'RND')], ['L4', (6, 'RND')], ['L4', (7, 'RND')]]
>>> emplocs.join(locs).map(len).collect()
[2, 2, 2, 2, 2, 2, 2]
>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1]))).collect()
[('L2', [3, 'Sales']), ('L3', [4, 'Marketing']), ('L1', [1, 'Accounts']), ('L1', [2, 'Accounts']), ('L4', [5, 'RND']), ('L4', [6, 'RND']), ('L4', [7, 'RND'])]
>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1][1]))).collect()
[('L2', ['S', 'a', 'l', 'e', 's']), ('L3', ['M', 'a', 'r', 'k', 'e', 't', 'i', 'n', 'g']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D'])]
>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1]))).collect()
[('L2', [3, 'Sales']), ('L3', [4, 'Marketing']), ('L1', [1, 'Accounts']), ('L1', [2, 'Accounts']), ('L4', [5, 'RND']), ('L4', [6, 'RND']), ('L4', [7, 'RND'])]
>>> emplocs.join(locs).map(lambda r: r[0]).collect()
['L2', 'L3', 'L1', 'L1', 'L4', 'L4', 'L4']
>>> emplocs.join(locs).map(lambda r: list(r[0])).collect()
[['L', '2'], ['L', '3'], ['L', '1'], ['L', '1'], ['L', '4'], ['L', '4'], ['L', '4']]
>>> emplocs.join(locs).map(lambda r: r[0]).collect()
['L2', 'L3', 'L1', 'L1', 'L4', 'L4', 'L4']
>>> emplocs.join(locs).map(lambda r: (r[0],r[1])).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
>>> emplocs.join(locs).map(lambda r: (r[0],r[1][1])).collect()
[('L2', 'Sales'), ('L3', 'Marketing'), ('L1', 'Accounts'), ('L1', 'Accounts'), ('L4', 'RND'), ('L4', 'RND'), ('L4', 'RND')]
>>> emplocs.join(locs).map(lambda r: (r[0],list(r[1][1]))).collect()
[('L2', ['S', 'a', 'l', 'e', 's']), ('L3', ['M', 'a', 'r', 'k', 'e', 't', 'i', 'n', 'g']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D'])]
>>> emplocs.join(locs).map(lambda r: (r[0],r[1][1])).collect()
[('L2', 'Sales'), ('L3', 'Marketing'), ('L1', 'Accounts'), ('L1', 'Accounts'), ('L4', 'RND'), ('L4', 'RND'), ('L4', 'RND')]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0],x[1][1])).collect()
[('L2', 3, 'Sales'), ('L3', 4, 'Marketing'), ('L1', 1, 'Accounts'), ('L1', 2, 'Accounts'), ('L4', 5, 'RND'), ('L4', 6, 'RND'), ('L4', 7, 'RND')]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).collect()
[('L2', 3), ('L3', 4), ('L1', 1), ('L1', 2), ('L4', 5), ('L4', 6), ('L4', 7)]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).collect()
[('L2', 3), ('L3', 4), ('L1', 1), ('L1', 2), ('L4', 5), ('L4', 6), ('L4', 7)]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).groupByKey().collect()
[('L2', ), ('L3', ), ('L1', ), ('L4', )]
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).countByKey()
defaultdict(, {'L2': 1, 'L3': 1, 'L1': 2, 'L4': 3})
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).countByKey().items()
dict_items([('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)])
>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).groupByKey().mapValues(list).collect()
[('L2', [3]), ('L3', [4]), ('L1', [1, 2]), ('L4', [5, 6, 7])]
>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500), ('', 8, 'Herkj Gdf', 2354)]
>>> locs.collect()
[('L1', 'Accounts', 'New York'), ('L2', 'Sales', 'London'), ('L3', 'Marketing', 'Dubai'), ('L4', 'RND', 'Mumbai'), ('L5', 'Delivery', 'Singapore')]
>>> emplocs.leftOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('', (8, None)), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
>>> emplocs.subtract(locs).collect()
[('L1', 1, 'Avc Ser', 2300), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 6, 'Fuyg Ruyoi', 4020), ('', 8, 'Herkj Gdf', 2354), ('L1', 2, 'Kghf Jfjs', 3400), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 7, 'Gfgd Tefdf', 3500)]
>>> emplocs.leftOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('', (8, None)), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]
>>> emplocs.rightOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND')), ('L5', (None, 'Delivery'))]
>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RNDs'))]
>>> emplocs.cartesian(locs).collect()
[(('L1', 1, 'Avc Ser', 2300), ('L1', 'Accounts', 'New York')), (('L1', 1, 'Avc Ser', 2300), ('L2', 'Sales', 'London')), (('L1', 1, 'Avc Ser', 2300), ('L3', 'Marketing', 'Dubai')), (('L1', 1, 'Avc Ser', 2300), ('L4', 'RND', 'Mumbai')), (('L1', 1, 'Avc Ser', 2300), ('L5', 'Delivery', 'Singapore')), (('L1', 2, 'Kghf Jfjs', 3400), ('L1', 'Accounts', 'New York')), (('L1', 2, 'Kghf Jfjs', 3400), ('L2', 'Sales', 'London')), (('L1', 2, 'Kghf Jfjs', 3400), ('L3', 'Marketing', 'Dubai')), (('L1', 2, 'Kghf Jfjs', 3400), ('L4', 'RND', 'Mumbai')), (('L1', 2, 'Kghf Jfjs', 3400), ('L5', 'Delivery', 'Singapore')), (('L2', 3, 'Jgf Fjf', 2455), ('L1', 'Accounts', 'New York')), (('L2', 3, 'Jgf Fjf', 2455), ('L2', 'Sales', 'London')), (('L2', 3, 'Jgf Fjf', 2455), ('L3', 'Marketing', 'Dubai')), (('L2', 3, 'Jgf Fjf', 2455), ('L4', 'RND', 'Mumbai')), (('L2', 3, 'Jgf Fjf', 2455), ('L5', 'Delivery', 'Singapore')), (('L3', 4, 'Ejh Ruyg', 3500), ('L1', 'Accounts', 'New York')), (('L3', 4, 'Ejh Ruyg', 3500), ('L2', 'Sales', 'London')), (('L3', 4, 'Ejh Ruyg', 3500), ('L3', 'Marketing', 'Dubai')), (('L3', 4, 'Ejh Ruyg', 3500), ('L4', 'RND', 'Mumbai')), (('L3', 4, 'Ejh Ruyg', 3500), ('L5', 'Delivery', 'Singapore')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L1', 'Accounts', 'New York')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L2', 'Sales', 'London')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L3', 'Marketing', 'Dubai')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 'RND', 'Mumbai')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L5', 'Delivery', 'Singapore')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L1', 'Accounts', 'New York')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L2', 'Sales', 'London')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L3', 'Marketing', 'Dubai')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 'RND', 'Mumbai')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L5', 'Delivery', 'Singapore')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L1', 'Accounts', 'New York')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L2', 'Sales', 'London')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L3', 'Marketing', 'Dubai')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L4', 'RND', 'Mumbai')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L5', 'Delivery', 'Singapore')), (('', 8, 'Herkj Gdf', 2354), ('L1', 'Accounts', 'New York')), (('', 8, 'Herkj Gdf', 2354), ('L2', 'Sales', 'London')), (('', 8, 'Herkj Gdf', 2354), ('L3', 'Marketing', 'Dubai')), (('', 8, 'Herkj Gdf', 2354), ('L4', 'RND', 'Mumbai')), (('', 8, 'Herkj Gdf', 2354), ('L5', 'Delivery', 'Singapore'))]
>>> emplocs.cartesian(locs).count()
40
Comments
Post a Comment