pyspark-rdd-practise

RDD Operations
RDD Operations

Resilient Distributed Datasets (RDDs) are primitive data structres to hold the data
in a distributed way across different clusters to promote parallel processing and fault tolerance.



Simple RDD creation from a string list
>>> langs = sc.parallelize(['Java','Go','Scala','Python'])

>>> langs
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

RDD that store on 6 partitions
>>> langs = sc.parallelize(['Java','Go','Scala','Python'],6)

Map returns a new RDD with same length and order of given RDD by applying the given transformation
>>> langs.map(lambda s: s.title()).collect()
['Java', 'Go', 'Scala', 'Python']

Map transformation with upper conversion of given strings
>>> langs.map(lambda s: s.upper()).collect()
['JAVA', 'GO', 'SCALA', 'PYTHON']

>>> langs.map(lambda s: s.lower()).collect()
['java', 'go', 'scala', 'python']

Collect returns a new RDD which satisfies given condition
>>> langs.filter(lambda s: s.startswith('P')).collect()
['Python']

>>> langs.filter(lambda s: s.startswith('J')).collect()
['Java']

>>> langs.first()
'Java'

Glom returns RDD of list of each partition items
Since we created RDD with 6 partitions, so glom returns 6 items
>>> langs.glom().collect()
[[], ['Java'], ['Go'], [], ['Scala'], ['Python']]

>>> langs.count()
4

>>> langs.foreach(lambda s: print(f"Computer Language:{s}"))
Computer Language:Java
Computer Language:Go
Computer Language:Scala
Computer Language:Python

Fold applies repeative aggregate function on each  at partition level with initial value
>>> langs.fold("Computer Programming Languages:", (lambda s1,s2: s1+":"+s2))
'Computer Programming Languages::Computer Programming Languages::Computer Programming Languages::Java:Computer Programming Languages::Go:Computer Programming Languages::Computer Programming Languages::Scala:Computer Programming Languages::Python'

>>> langs.fold("", (lambda s1,s2: s1+":"+s2))
':::Java::Go:::Scala::Python'

Collect returns a sample subset of a RDD with
1st param - item repeative flag
2nd param - probability of each item to be in the sample, should between 0 and 1
>>> langs.sample(False,0.75).collect()
['Scala', 'Python']

fraction 1 for all elements
>>> langs.sample(False,1).collect()
['Java', 'Go', 'Scala', 'Python']

>>> langs.take(2)
['Java', 'Go']

TakeSample returns a RDD of Subset of elements with given size and repeat flag
>>> langs.takeSample(True,1)
['Scala']

>>> langs.takeSample(True,2)
['Scala', 'Java']

>>> langs.takeSample(True,3)
['Go', 'Python', 'Python']

>>> langs.takeSample(False,3)
['Scala', 'Go', 'Python']

>>> langs.takeSample(False,3)
['Scala', 'Go', 'Python']

>>> langs.takeSample(False,3)
['Scala', 'Go', 'Python']

>>> langs.map(lambda s: s.lower()).collect()
['java', 'go', 'scala', 'python']

>>> langs.flatMap(lambda s: s.lower()).collect()
['j', 'a', 'v', 'a', 'g', 'o', 's', 'c', 'a', 'l', 'a', 'p', 'y', 't', 'h', 'o', 'n']

>>> langs.takeSample(False,3)
['Go', 'Scala', 'Python']

>>> langorder = sc.parallelize([1,2,3,4])

Zip combines given RDD with another and returns as key-value pairs
Both RDDs should be same size and have same number of partitions
>>> langs.zip(langorder).collect()
Error: partitions should also match along with size

>>> langorder = sc.parallelize([1,2,3,4],6)

>>> langs.zip(langorder).collect()
[('Java', 1), ('Go', 2), ('Scala', 3), ('Python', 4)]

Zip with the Index - zips with the incremental index of 0 for all the elements of RDD
>>> langs.zipWithIndex().collect()
[('Java', 0), ('Go', 1), ('Scala', 2), ('Python', 3)]

Zip with Unique ID - zips with the index multiplied by partition number, may not be incremental
>>> langs.zipWithUniqueId().collect()
[('Java', 1), ('Go', 2), ('Scala', 4), ('Python', 5)]

Number Operations

Range function returns all the numbers between the given range
Numbers fall between start(default: 0) and end(last number) will be returned
>>> nums = sc.parallelize(range(10))

>>> nums.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> nums.map(lambda n: "even" if n%2==0 else "odd").collect()
['even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd']

>>> nums.max()
9
>>> nums.min()
0

>>> nums.stats()
(count: 10, mean: 4.5, stdev: 2.8722813232690143, max: 9, min: 0)

>>> nums.aggregate(0,lambda a,b: a+b,lambda x,y: x+y)
45

>>> nums.count()
10

>>> nums.filter(lambda num: num%2 == 0).collect()
[0, 2, 4, 6, 8]

>>> nums.filter(lambda num: num%2 > 0).collect()
[1, 3, 5, 7, 9]

>>> nums.fold(0,lambda n1,n2: n1+n2)
45

>>> nums.sum()
45

>>> nums.sumApprox(100) returns sum before timeout
45.0

>>> nums.sortBy(lambda a: a,False).collect()
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

>>> nums.sortBy(lambda a: a,True).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> nums.take(23)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> nums.takeOrdered(3)
[0, 1, 2]

>>> nums.takeOrdered(3, lambda x:x) - default ascending order
[0, 1, 2]

>>> nums.takeOrdered(3, lambda x:-x) - reverse order
[9, 8, 7]

>>> nums.variance()
8.25

>>> numtypes = nums.map(lambda num: 'even' if num%2 == 0 else 'odd')
>>> numtypes.collect()
['even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd']

>>> nums.zip(numtypes)
org.apache.spark.api.java.JavaPairRDD@1cd62c10

>>> nums.zip(numtypes).collect()
[(0, 'even'), (1, 'odd'), (2, 'even'), (3, 'odd'), (4, 'even'), (5, 'odd'), (6, 'even'), (7, 'odd'), (8, 'even'), (9, 'odd')]

>>> nums.zipWithIndex().collect()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]

>>> nums.zipWithUniqueId().collect()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]



>>> colorshexcodes = sc.parallelize([('red','#FF0000'),('yellow','#FFFF00'),('green','#008000'),('blue','#0000FF'),('white','#FFFFFF'),('black','#000000'),('brown','#A52A2A')])

>>> colorshexcodes.collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.isEmpty()
False

>>> colorshexcodes.collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.collectAsMap()
{'red': '#FF0000', 'yellow': '#FFFF00', 'green': '#008000', 'blue': '#0000FF', 'white': '#FFFFFF', 'black': '#000000', 'brown': '#A52A2A'}

>>> colorshexcodes.foreach(print)
('red', '#FF0000')
('yellow', '#FFFF00')
('green', '#008000')
('blue', '#0000FF')
('white', '#FFFFFF')
('black', '#000000')
('brown', '#A52A2A')

>>> colorshexcodes.filter(lambda c: c[0].startswith('w')).collect()
[('white', '#FFFFFF')]

>>> colorshexcodes.filter(lambda c: c[1].startswith('#')).collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.groupByKey().collect()
[('red', ), ('yellow', ), ('green', ), ('blue', ), ('white', ), ('black', ), ('brown', )]

>>> colorshexcodes.groupByKey().mapValues(len).collect()
[('red', 1), ('yellow', 1), ('green', 1), ('blue', 1), ('white', 1), ('black', 1), ('brown', 1)]

>>> colorshexcodes.reduceByKey(len).collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> from operator import add
>>> colorshexcodes.reduceByKey(add).collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.groupByKey().collect()
[('red', ), ('yellow', ), ('green', ), ('blue', ), ('white', ), ('black', ), ('brown', )]

>>> colorshexcodes.groupByKey().foreach(print)
('red', )
('yellow', )
('green', )
('blue', )
('white', )
('black', )
('brown', )

>>> colorshexcodes.reduceByKey(add).collect()
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000'), ('blue', '#0000FF'), ('white', '#FFFFFF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.groupByKey().mapValues(len).collect()
[('red', 1), ('yellow', 1), ('green', 1), ('blue', 1), ('white', 1), ('black', 1), ('brown', 1)]

>>> colorshexcodes.keys().collect()
['red', 'yellow', 'green', 'blue', 'white', 'black', 'brown']

>>> colorshexcodes.values().collect()
['#FF0000', '#FFFF00', '#008000', '#0000FF', '#FFFFFF', '#000000', '#A52A2A']

>>> colorshexcodes.map(lambda k: k[0]).collect()
['red', 'yellow', 'green', 'blue', 'white', 'black', 'brown']

>>> colorshexcodes.map(lambda k: (k[0],)).collect()
[('red',), ('yellow',), ('green',), ('blue',), ('white',), ('black',), ('brown',)]

>>> colorshexcodes.map(lambda k: (k[0],k[1].replace('#',''))).collect()
[('red', 'FF0000'), ('yellow', 'FFFF00'), ('green', '008000'), ('blue', '0000FF'), ('white', 'FFFFFF'), ('black', '000000'), ('brown', 'A52A2A')]

>>> colorshexcodes.map(lambda k: (k[0],k[1].replace('#',''))).flatMap(lambda f: f).collect()
['red', 'FF0000', 'yellow', 'FFFF00', 'green', '008000', 'blue', '0000FF', 'white', 'FFFFFF', 'black', '000000', 'brown', 'A52A2A']

>> colorshexcodes.saveAsTextFile("file:/home/mkm/export/colorhexcodes")

>>> colorshexcodes.saveAsHadoopFile(
"hdfs://localhost:9000/colorshexcodes",
"org.apache.hadoop.mapred.TextOutputFormat",
"org.apache.hadoop.io.IntWritable","org.apache.hadoop.io.Text")

$ hdfs dfs -ls /colorshexcodes/
24/08/06 18:50:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 mkm supergroup          0 2024-08-06 18:49 /colorshexcodes/_SUCCESS
-rw-r--r--   1 mkm supergroup         96 2024-08-06 18:49 /colorshexcodes/part-00000

>>> colorshexcodes.take(3)
[('red', '#FF0000'), ('yellow', '#FFFF00'), ('green', '#008000')]

Order by element, default is ascending order
>>> colorshexcodes.takeOrdered(5, lambda x: x)
[('black', '#000000'), ('blue', '#0000FF'), ('brown', '#A52A2A'), ('green', '#008000'), ('red', '#FF0000')]

Order by descending order, lambda function has to be provided for ordering
>>> colorshexcodes.takeOrdered(3, key=lambda x: -ord(x[0][1]))
[('green', '#008000'), ('brown', '#A52A2A'), ('blue', '#0000FF')]

>>> colorshexcodes.takeOrdered(4, key=lambda x: -ord(x[0][1]))
[('green', '#008000'), ('brown', '#A52A2A'), ('blue', '#0000FF'), ('black', '#000000')]

>>> colorshexcodes.takeOrdered(6, key=lambda x: -ord(x[0][1]))
[('green', '#008000'), ('brown', '#A52A2A'), ('blue', '#0000FF'), ('black', '#000000'), ('white', '#FFFFFF'), ('red', '#FF0000')]

>>> colorshexcodes.takeOrdered(6, key=lambda x: -ord(x[0][0]))
[('yellow', '#FFFF00'), ('white', '#FFFFFF'), ('red', '#FF0000'), ('green', '#008000'), ('blue', '#0000FF'), ('black', '#000000')]

>>> colorshexcodes.takeOrdered(7, key=lambda x: -ord(x[0][0]))
[('yellow', '#FFFF00'), ('white', '#FFFFFF'), ('red', '#FF0000'), ('green', '#008000'), ('blue', '#0000FF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.takeOrdered(7, key=lambda x: -ord(x[0][0]))
[('yellow', '#FFFF00'), ('white', '#FFFFFF'), ('red', '#FF0000'), ('green', '#008000'), ('blue', '#0000FF'), ('black', '#000000'), ('brown', '#A52A2A')]

>>> colorshexcodes.takeSample(True,3)
[('brown', '#A52A2A'), ('black', '#000000'), ('green', '#008000')]

>>> colorshexcodes.takeSample(False,3)
[('white', '#FFFFFF'), ('blue', '#0000FF'), ('green', '#008000')]

>>> colorshexcodes.toDebugString()
b'(1) ParallelCollectionRDD[130] at parallelize at PythonRDD.scala:195 []'

>>> colorshexcodes.zipWithIndex().collect()
[(('red', '#FF0000'), 0), (('yellow', '#FFFF00'), 1), (('green', '#008000'), 2), (('blue', '#0000FF'), 3), (('white', '#FFFFFF'), 4), (('black', '#000000'), 5), (('brown', '#A52A2A'), 6)]

>>> colorshexcodes.zipWithIndex().flatMap(lambda x: x).collect()
[('red', '#FF0000'), 0, ('yellow', '#FFFF00'), 1, ('green', '#008000'), 2, ('blue', '#0000FF'), 3, ('white', '#FFFFFF'), 4, ('black', '#000000'), 5, ('brown', '#A52A2A'), 6]

>>> colorshexcodes.zipWithIndex().flatMap(lambda x: x).collect()
[('red', '#FF0000'), 0, ('yellow', '#FFFF00'), 1, ('green', '#008000'), 2, ('blue', '#0000FF'), 3, ('white', '#FFFFFF'), 4, ('black', '#000000'), 5, ('brown', '#A52A2A'), 6]
>>>



>>> emps = sc.parallelize([(1,'Avc Ser',2300,'L1'),(2,'Kghf Jfjs',3400,'L1'),(3,'Jgf Fjf',2455,'L2'),(4,'Ejh Ruyg',3500,'L3'),(5,'Efgw Ygefrf',5400,'L4'),(6,'Fuyg Ruyoi',4020,'L4'),(7,'Gfgd Tefdf',3500,'L4')])

>>> locs = sc.parallelize([('L1','Accounts','New York'),('L2','Sales','London'),('L3','Marketing','Dubai'),('L4','RND','Mumbai')])

>>> emps.collect()
[(1, 'Avc Ser', 2300, 'L1'), (2, 'Kghf Jfjs', 3400, 'L1'), (3, 'Jgf Fjf', 2455, 'L2'), (4, 'Ejh Ruyg', 3500, 'L3'), (5, 'Efgw Ygefrf', 5400, 'L4'), (6, 'Fuyg Ruyoi', 4020, 'L4'), (7, 'Gfgd Tefdf', 3500, 'L4')]

>>> locs.collect()
[('L1', 'Accounts', 'New York'), ('L2', 'Sales', 'London'), ('L3', 'Marketing', 'Dubai'), ('L4', 'RND', 'Mumbai')]

We can not join employee rdd and location rdd directly as the keys(first elements) of both are different.
Location field has to be at first, so we need to make new rdd with location values at first
emps - (1, 'Avc Ser', 2300, 'L1'), new rdd - ('L1', 1, 'Avc Ser', 2300)
>>> emplocs = emps.map(lambda e: (e[3],e[0],e[1],e[2]))

>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500)]

>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

Count of employees of every location
>>> emplocs.join(locs).groupByKey().mapValues(len).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]

Grouping with sort - default is ascending order
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey().collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey(True).collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]
Descending order
>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey(False).collect()
[('L4', 3), ('L3', 1), ('L2', 1), ('L1', 2)]

Sort by value of K,V paired RDD
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: r[1]).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]

Reverse sort by value of K,V paried RDD
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: r[1],False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

Sort by mutliple values
>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[1],r[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda r: (r[0],r[1]),False).collect()
[('L4', 3), ('L3', 1), ('L2', 1), ('L1', 2)]

>>> emplocs.join(locs).groupByKey().mapValues(len).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).groupByKey().collect()
[('L2', ), ('L3', ), ('L1', ), ('L4', )]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortByKey().collect()
[('L1', 2), ('L2', 1), ('L3', 1), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: l[1]).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: -l[1]).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0])).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0])).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (-l[1],l[0]),False).collect()
[('L3', 1), ('L2', 1), ('L1', 2), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),True).collect()
[('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],l[0]),False).collect()
[('L4', 3), ('L1', 2), ('L3', 1), ('L2', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],-ord(l[0][0])),False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

>>> emplocs.join(locs).groupByKey().mapValues(len).sortBy(lambda l: (l[1],-ord(l[0][0])),False).collect()
[('L4', 3), ('L1', 2), ('L2', 1), ('L3', 1)]

Joins with null values
>>> emps = sc.parallelize([(1,'Avc Ser',2300,'L1'),(2,'Kghf Jfjs',3400,'L1'),(3,'Jgf Fjf',2455,'L2'),(4,'Ejh Ruyg',3500,'L3'),(5,'Efgw Ygefrf',5400,'L4'),(6,'Fuyg Ruyoi',4020,'L4'),(7,'Gfgd Tefdf',3500,'L4'),(8,'Herkj Gdf',2354,'')])

>>> locs = sc.parallelize([('L1','Accounts','New York'),('L2','Sales','London'),('L3','Marketing','Dubai'),('L4','RND','Mumbai'),('L5','Delivery','Singapore')])

>>> emplocs = emps.map(lambda e: (e[3],e[0],e[1],e[2]))

>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500), ('', 8, 'Herkj Gdf', 2354)]

>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

>>> emplocs.join(locs).map(list).collect()
[['L2', (3, 'Sales')], ['L3', (4, 'Marketing')], ['L1', (1, 'Accounts')], ['L1', (2, 'Accounts')], ['L4', (5, 'RND')], ['L4', (6, 'RND')], ['L4', (7, 'RND')]]

>>> emplocs.join(locs).map(len).collect()
[2, 2, 2, 2, 2, 2, 2]

>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1]))).collect()
[('L2', [3, 'Sales']), ('L3', [4, 'Marketing']), ('L1', [1, 'Accounts']), ('L1', [2, 'Accounts']), ('L4', [5, 'RND']), ('L4', [6, 'RND']), ('L4', [7, 'RND'])]

>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1][1]))).collect()
[('L2', ['S', 'a', 'l', 'e', 's']), ('L3', ['M', 'a', 'r', 'k', 'e', 't', 'i', 'n', 'g']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D'])]

>>> emplocs.join(locs).map(lambda r: (r[0], list(r[1]))).collect()
[('L2', [3, 'Sales']), ('L3', [4, 'Marketing']), ('L1', [1, 'Accounts']), ('L1', [2, 'Accounts']), ('L4', [5, 'RND']), ('L4', [6, 'RND']), ('L4', [7, 'RND'])]

>>> emplocs.join(locs).map(lambda r: r[0]).collect()
['L2', 'L3', 'L1', 'L1', 'L4', 'L4', 'L4']

>>> emplocs.join(locs).map(lambda r: list(r[0])).collect()
[['L', '2'], ['L', '3'], ['L', '1'], ['L', '1'], ['L', '4'], ['L', '4'], ['L', '4']]

>>> emplocs.join(locs).map(lambda r: r[0]).collect()
['L2', 'L3', 'L1', 'L1', 'L4', 'L4', 'L4']

>>> emplocs.join(locs).map(lambda r: (r[0],r[1])).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

>>> emplocs.join(locs).map(lambda r: (r[0],r[1][1])).collect()
[('L2', 'Sales'), ('L3', 'Marketing'), ('L1', 'Accounts'), ('L1', 'Accounts'), ('L4', 'RND'), ('L4', 'RND'), ('L4', 'RND')]

>>> emplocs.join(locs).map(lambda r: (r[0],list(r[1][1]))).collect()
[('L2', ['S', 'a', 'l', 'e', 's']), ('L3', ['M', 'a', 'r', 'k', 'e', 't', 'i', 'n', 'g']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L1', ['A', 'c', 'c', 'o', 'u', 'n', 't', 's']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D']), ('L4', ['R', 'N', 'D'])]

>>> emplocs.join(locs).map(lambda r: (r[0],r[1][1])).collect()
[('L2', 'Sales'), ('L3', 'Marketing'), ('L1', 'Accounts'), ('L1', 'Accounts'), ('L4', 'RND'), ('L4', 'RND'), ('L4', 'RND')]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0],x[1][1])).collect()
[('L2', 3, 'Sales'), ('L3', 4, 'Marketing'), ('L1', 1, 'Accounts'), ('L1', 2, 'Accounts'), ('L4', 5, 'RND'), ('L4', 6, 'RND'), ('L4', 7, 'RND')]


>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).collect()
[('L2', 3), ('L3', 4), ('L1', 1), ('L1', 2), ('L4', 5), ('L4', 6), ('L4', 7)]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).reduceByKey(add).collect()
[('L2', 3), ('L3', 4), ('L1', 3), ('L4', 18)]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).collect()
[('L2', 3), ('L3', 4), ('L1', 1), ('L1', 2), ('L4', 5), ('L4', 6), ('L4', 7)]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).groupByKey().collect()
[('L2', ), ('L3', ), ('L1', ), ('L4', )]

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).countByKey()
defaultdict(, {'L2': 1, 'L3': 1, 'L1': 2, 'L4': 3})

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).countByKey().items()
dict_items([('L2', 1), ('L3', 1), ('L1', 2), ('L4', 3)])

>>> emplocs.join(locs).map(lambda x: (x[0],x[1][0])).groupByKey().mapValues(list).collect()
[('L2', [3]), ('L3', [4]), ('L1', [1, 2]), ('L4', [5, 6, 7])]

>>> emplocs.collect()
[('L1', 1, 'Avc Ser', 2300), ('L1', 2, 'Kghf Jfjs', 3400), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 7, 'Gfgd Tefdf', 3500), ('', 8, 'Herkj Gdf', 2354)]

>>> locs.collect()
[('L1', 'Accounts', 'New York'), ('L2', 'Sales', 'London'), ('L3', 'Marketing', 'Dubai'), ('L4', 'RND', 'Mumbai'), ('L5', 'Delivery', 'Singapore')]

>>> emplocs.leftOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('', (8, None)), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

>>> emplocs.subtract(locs).collect()
[('L1', 1, 'Avc Ser', 2300), ('L2', 3, 'Jgf Fjf', 2455), ('L3', 4, 'Ejh Ruyg', 3500), ('L4', 6, 'Fuyg Ruyoi', 4020), ('', 8, 'Herkj Gdf', 2354), ('L1', 2, 'Kghf Jfjs', 3400), ('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 7, 'Gfgd Tefdf', 3500)]

>>> emplocs.leftOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('', (8, None)), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND'))]

>>> emplocs.rightOuterJoin(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RND')), ('L5', (None, 'Delivery'))]

>>> emplocs.join(locs).collect()
[('L2', (3, 'Sales')), ('L3', (4, 'Marketing')), ('L1', (1, 'Accounts')), ('L1', (2, 'Accounts')), ('L4', (5, 'RND')), ('L4', (6, 'RND')), ('L4', (7, 'RNDs'))]

>>> emplocs.cartesian(locs).collect()
[(('L1', 1, 'Avc Ser', 2300), ('L1', 'Accounts', 'New York')), (('L1', 1, 'Avc Ser', 2300), ('L2', 'Sales', 'London')), (('L1', 1, 'Avc Ser', 2300), ('L3', 'Marketing', 'Dubai')), (('L1', 1, 'Avc Ser', 2300), ('L4', 'RND', 'Mumbai')), (('L1', 1, 'Avc Ser', 2300), ('L5', 'Delivery', 'Singapore')), (('L1', 2, 'Kghf Jfjs', 3400), ('L1', 'Accounts', 'New York')), (('L1', 2, 'Kghf Jfjs', 3400), ('L2', 'Sales', 'London')), (('L1', 2, 'Kghf Jfjs', 3400), ('L3', 'Marketing', 'Dubai')), (('L1', 2, 'Kghf Jfjs', 3400), ('L4', 'RND', 'Mumbai')), (('L1', 2, 'Kghf Jfjs', 3400), ('L5', 'Delivery', 'Singapore')), (('L2', 3, 'Jgf Fjf', 2455), ('L1', 'Accounts', 'New York')), (('L2', 3, 'Jgf Fjf', 2455), ('L2', 'Sales', 'London')), (('L2', 3, 'Jgf Fjf', 2455), ('L3', 'Marketing', 'Dubai')), (('L2', 3, 'Jgf Fjf', 2455), ('L4', 'RND', 'Mumbai')), (('L2', 3, 'Jgf Fjf', 2455), ('L5', 'Delivery', 'Singapore')), (('L3', 4, 'Ejh Ruyg', 3500), ('L1', 'Accounts', 'New York')), (('L3', 4, 'Ejh Ruyg', 3500), ('L2', 'Sales', 'London')), (('L3', 4, 'Ejh Ruyg', 3500), ('L3', 'Marketing', 'Dubai')), (('L3', 4, 'Ejh Ruyg', 3500), ('L4', 'RND', 'Mumbai')), (('L3', 4, 'Ejh Ruyg', 3500), ('L5', 'Delivery', 'Singapore')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L1', 'Accounts', 'New York')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L2', 'Sales', 'London')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L3', 'Marketing', 'Dubai')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L4', 'RND', 'Mumbai')), (('L4', 5, 'Efgw Ygefrf', 5400), ('L5', 'Delivery', 'Singapore')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L1', 'Accounts', 'New York')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L2', 'Sales', 'London')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L3', 'Marketing', 'Dubai')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L4', 'RND', 'Mumbai')), (('L4', 6, 'Fuyg Ruyoi', 4020), ('L5', 'Delivery', 'Singapore')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L1', 'Accounts', 'New York')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L2', 'Sales', 'London')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L3', 'Marketing', 'Dubai')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L4', 'RND', 'Mumbai')), (('L4', 7, 'Gfgd Tefdf', 3500), ('L5', 'Delivery', 'Singapore')), (('', 8, 'Herkj Gdf', 2354), ('L1', 'Accounts', 'New York')), (('', 8, 'Herkj Gdf', 2354), ('L2', 'Sales', 'London')), (('', 8, 'Herkj Gdf', 2354), ('L3', 'Marketing', 'Dubai')), (('', 8, 'Herkj Gdf', 2354), ('L4', 'RND', 'Mumbai')), (('', 8, 'Herkj Gdf', 2354), ('L5', 'Delivery', 'Singapore'))]

>>> emplocs.cartesian(locs).count()
40

Compiled on WEDNESDAY, 07-AUGUST-2024, 01:01:19 PM IST

Comments

Popular posts from this blog

hadoop-installation-ubuntu

jenv-tool

hive-installation-in-ubuntu