目录 | 上一节 (6.2 自定义迭代) | 下一节 (6.4 生成器表达式)

6.3 生产者,消费者和管道

生成器在设置各种生产者/消费者问题(producer/consumer problems)和数据流管道(pipeline)中非常有用。本节将对此进行讨论。

生产者消费者问题

生成器与各种形式的 生产者消费者 问题密切相关。

  1. # Producer
  2. def follow(f):
  3. ...
  4. while True:
  5. ...
  6. yield line # Produces value in `line` below
  7. ...
  8. # Consumer
  9. for line in follow(f): # Consumes value from `yield` above
  10. ...

yield 语句生成给 for 语句消费的值。

生成器管道

你可以使用生成器的这方面特性来设置进程管道(类似于 Unix 管道(pipe))。

producerprocessingprocessingconsumer

进程管道包括初始的数据生产者、中间的处理阶段、最后的消费者。

producerprocessingprocessingconsumer

  1. def producer():
  2. ...
  3. yield item
  4. ...

通常情况下,生产者是一个生成器,尽管也可以是其它的序列列表。yield 将数据输入管道。

producerprocessingprocessingconsumer

  1. def consumer(s):
  2. for item in s:
  3. ...

消费者是一个 for 循环,获取数据(译注:items)并对数据执行某些操作。

producerprocessingprocessingconsumer

  1. def processing(s):
  2. for item in s:
  3. ...
  4. yield newitem
  5. ...

中间的处理阶段同时消费和生产数据。它们可能修改数据流,也可能筛选数据流(丢弃数据)。

producerprocessingprocessingconsumer

  1. def producer():
  2. ...
  3. yield item # yields the item that is received by the `processing`
  4. ...
  5. def processing(s):
  6. for item in s: # Comes from the `producer`
  7. ...
  8. yield newitem # yields a new item
  9. ...
  10. def consumer(s):
  11. for item in s: # Comes from the `processing`
  12. ...

设置管道的代码如下:

  1. a = producer()
  2. b = processing(a)
  3. c = consumer(b)

你会发现数据逐渐地流向不同的函数。

练习

对于本练习,stocksim.py 程序仍需要在后台运行。并且,你将使用到上一节练习(译注:练习 6.7)编写的 follow() 函数。

练习 6.8:创建一个简单的管道

让我们来看看管道的思想。请创建下面这个函数:

  1. >>> def filematch(lines, substr):
  2. for line in lines:
  3. if substr in line:
  4. yield line
  5. >>>

filematch() 函数除了不再打开文件,几乎与上一节练习的第一个生成器示例完全相同——仅仅对作为参数给出的行序列进行操作。现在,请尝试如下操作:

  1. >>> from follow import follow
  2. >>> lines = follow('Data/stocklog.csv')
  3. >>> ibm = filematch(lines, 'IBM')
  4. >>> for line in ibm:
  5. print(line)
  6. ... wait for output ...

虽然输出可能需要一定时间才会出现,但是,最后你一定会看到包含 IBM 数据的行。

练习 6.9:创建一个复杂的管道

通过执行更多操作来进一步理解管道的思想。

  1. >>> from follow import follow
  2. >>> import csv
  3. >>> lines = follow('Data/stocklog.csv')
  4. >>> rows = csv.reader(lines)
  5. >>> for row in rows:
  6. print(row)
  7. ['BA', '98.35', '6/11/2007', '09:41.07', '0.16', '98.25', '98.35', '98.31', '158148']
  8. ['AA', '39.63', '6/11/2007', '09:41.07', '-0.03', '39.67', '39.63', '39.31', '270224']
  9. ['XOM', '82.45', '6/11/2007', '09:41.07', '-0.23', '82.68', '82.64', '82.41', '748062']
  10. ['PG', '62.95', '6/11/2007', '09:41.08', '-0.12', '62.80', '62.97', '62.61', '454327']
  11. ...

这非常有趣。你在这里可以看到, follow() 函数的输出被传递到 csv.reader()函数,并且,我们现在得到了一系列拆分的行。

练习 6.10:创建更多管道组件

让我们把这样的思想扩展到更大的管道中。首先,创建 ticker.py 文件,然后在 ticker.py 文件里面创建一个函数,像上面一样读取 CSV 文件:

  1. # ticker.py
  2. from follow import follow
  3. import csv
  4. def parse_stock_data(lines):
  5. rows = csv.reader(lines)
  6. return rows
  7. if __name__ == '__main__':
  8. lines = follow('Data/stocklog.csv')
  9. rows = parse_stock_data(lines)
  10. for row in rows:
  11. print(row)

接着,创建一个选择特定列的新函数:

  1. # ticker.py
  2. ...
  3. def select_columns(rows, indices):
  4. for row in rows:
  5. yield [row[index] for index in indices]
  6. ...
  7. def parse_stock_data(lines):
  8. rows = csv.reader(lines)
  9. rows = select_columns(rows, [0, 1, 4])
  10. return rows

再次运行程序,你应该可以看到输出缩小如下:

  1. ['BA', '98.35', '0.16']
  2. ['AA', '39.63', '-0.03']
  3. ['XOM', '82.45','-0.23']
  4. ['PG', '62.95', '-0.12']
  5. ...

再接着,创建一个生成器函数以转换数据类型并构建字典。示例:

  1. # ticker.py
  2. ...
  3. def convert_types(rows, types):
  4. for row in rows:
  5. yield [func(val) for func, val in zip(types, row)]
  6. def make_dicts(rows, headers):
  7. for row in rows:
  8. yield dict(zip(headers, row))
  9. ...
  10. def parse_stock_data(lines):
  11. rows = csv.reader(lines)
  12. rows = select_columns(rows, [0, 1, 4])
  13. rows = convert_types(rows, [str, float, float])
  14. rows = make_dicts(rows, ['name', 'price', 'change'])
  15. return rows
  16. ...

再次运行程序,你应该能够看到像下面这样的字典流:

  1. { 'name':'BA', 'price':98.35, 'change':0.16 }
  2. { 'name':'AA', 'price':39.63, 'change':-0.03 }
  3. { 'name':'XOM', 'price':82.45, 'change': -0.23 }
  4. { 'name':'PG', 'price':62.95, 'change':-0.12 }
  5. ...

练习 6.11:筛选数据

创建一个筛选数据的函数。示例:

  1. # ticker.py
  2. ...
  3. def filter_symbols(rows, names):
  4. for row in rows:
  5. if row['name'] in names:
  6. yield row

使用该函数可以筛选出投资组合中的股票:

  1. import report
  2. portfolio = report.read_portfolio('Data/portfolio.csv')
  3. rows = parse_stock_data(follow('Data/stocklog.csv'))
  4. rows = filter_symbols(rows, portfolio)
  5. for row in rows:
  6. print(row)

练习 6.12:整合所有的代码

请在 ticker.py 文件中编写函数 ticker(portfile, logfile, fmt) ,该函数根据给定的投资组合、日志文件和表格格式创建实时的股票报价器。示例:

  1. >>> from ticker import ticker
  2. >>> ticker('Data/portfolio.csv', 'Data/stocklog.csv', 'txt')
  3. Name Price Change
  4. ---------- ---------- ----------
  5. GE 37.14 -0.18
  6. MSFT 29.96 -0.09
  7. CAT 78.03 -0.49
  8. AA 39.34 -0.32
  9. ...
  10. >>> ticker('Data/portfolio.csv', 'Data/stocklog.csv', 'csv')
  11. Name,Price,Change
  12. IBM,102.79,-0.28
  13. CAT,78.04,-0.48
  14. AA,39.35,-0.31
  15. CAT,78.05,-0.47
  16. ...

讨论

心得体会:你可以创建各种生成器函数,并把它们链接在一起执行涉及数据流的管道处理。另外,你可以创建一个函数,把一系列的管道阶段打包到一个单独的函数中调用(例如 parse_stock_data() 函数)。

目录 | 上一节 (6.2 自定义迭代) | 下一节 (6.4 生成器表达式)