Python版本

  1. def processCallSigns(signs):
  2. http = urllib3.PoolManager()
  3. urls = map(lambda x: "http://73s.com/qsos/%s.json"%x,signs)
  4. requests = map(lambda x:(x,http.request('GET',x)),urls)
  5. result = map(lambda x:(x[0],json.loads(x[1].data)),requests)
  6. return filter(lambda x: x[1] is not None,result)
  7. def fetchCallSigns(input):
  8. return input.mapPartitions(lambda callSigns:processCallSigns(callSigns))
  9. cons = fetchCallSigns(validSigns)

Scala版本

  1. val contactsCon = validSigns.distinct().mapPartitions{
  2. signs =>
  3. val mapper = createMapper()
  4. val client = new HttpClient()
  5. client.start()
  6. signs.map{
  7. sign => createExchangerForSing(sign)
  8. }.map{case (sign,exchanger0 => (sign,readExchangerCallLog(mapper,exchanger))}.filter(x => x._2 !=null)
  9. }