Python版本
def processCallSigns(signs): http = urllib3.PoolManager() urls = map(lambda x: "http://73s.com/qsos/%s.json"%x,signs) requests = map(lambda x:(x,http.request('GET',x)),urls) result = map(lambda x:(x[0],json.loads(x[1].data)),requests) return filter(lambda x: x[1] is not None,result)def fetchCallSigns(input): return input.mapPartitions(lambda callSigns:processCallSigns(callSigns))cons = fetchCallSigns(validSigns)
Scala版本
val contactsCon = validSigns.distinct().mapPartitions{ signs => val mapper = createMapper() val client = new HttpClient() client.start() signs.map{ sign => createExchangerForSing(sign) }.map{case (sign,exchanger0 => (sign,readExchangerCallLog(mapper,exchanger))}.filter(x => x._2 !=null)}