Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049
124-01-2425
1231232
088-57-9593
905-60-3585
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
for l in infile:
l = l.strip()
nums = l.split('-')
key = -1
if l.isdigit() and len(l) == 9:
key = int(l)
if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
if key == -1:
outfile.write(l + '\n')
else:
if key not in htable:
htable[key] = l
valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
for x in sorted(htable):
outfile.write(htable[x] + '\n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
l = l.strip()
nums = l.split('-')
cdn1 = (l.isdigit() and len(l) == 9)
cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
if cdn1 or cdn2:
return True
return False

def set_key(l):
global valid_cnt
valid_cnt += 1
l = l.strip()
if len(l) == 9:
return (int(l), l)
nums = l.split('-')
return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct() \
.map(lambda x: set_key(x)) \
.sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
print 'Invalid SSN\t', x

for x in rdd2.collect():
print 'valid SSN\t', x

print '\nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()