Python版本
def processCallSigns(signs):
http = urllib3.PoolManager()
urls = map(lambda x: "http://73s.com/qsos/%s.json"%x,signs)
requests = map(lambda x:(x,http.request('GET',x)),urls)
result = map(lambda x:(x[0],json.loads(x[1].data)),requests)
return filter(lambda x: x[1] is not None,result)
def fetchCallSigns(input):
return input.mapPartitions(lambda callSigns:processCallSigns(callSigns))
cons = fetchCallSigns(validSigns)
Scala版本
val contactsCon = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
val client = new HttpClient()
client.start()
signs.map{
sign => createExchangerForSing(sign)
}.map{case (sign,exchanger0 => (sign,readExchangerCallLog(mapper,exchanger))}.filter(x => x._2 !=null)
}