持续集成系统

Malini Das

Malini Das 是一个致力于改善编码速度(当然是在保证代码安全的前提下),并不断寻求交叉编程的解决方案的软件工程师。她曾以工具工程师的身份供职于Mozilla,现在她在Twitch工作。可以通过关注Malini的Twitter或是她的博客来了解她的最新动态。

什么是持续集成系统?

在软件开发的过程中,我们希望能保证每一个新功能都能稳定的实现,每一个Bug都能按照预期得到修复。通常来讲这种方式就是对代码进行测试。多数情况下,开发人员会在开发环境中直接进行测试来确保功能实现的完整和稳定,很少有人会有时间在每一种可能的运行环境中都进行测试。此外,随着开发的不断进行,需要进行的测试不断的增加,在开发环境中对代码进行完全的测试的可行性也随之变得越来越低。持续集成系统的出现,正是为了解决这种开发中的困境。

持续集成(CI)系统是专门用来对新代码进行测试的系统。持续集成系统的职责就是在每次一段新的代码被提交时,确保这些新代码不会导致之前测试样例的失败。为此,持续集成系统就必须做到自动获取到最新的代码改动,自动完成测试,并生成测试报告。同时,持续集成系统还需要保证良好的稳定性。也就是说,当系统的任何一部分出现错误甚至崩溃时,整个系统应该可以从上一次中断的地方重新恢复运行。

这个系统同样需要均衡负载的能力,这样一来当提交新版本的时间比运行测试的时间还要短的时候,我们仍然可以保证在一个合理的时间内获得测试的结果。这一点我们可以通过多节点任务分发及并行化运行测试样例来实现。本项目中将介绍一个小型可拓展的极简分布式持续集成系统。

注意事项及相关说明

在本项目中使用Git作为进行测试的代码托管系统。我们只会调用标准的代码管理指令,所以,如果你并不熟悉Git的操作,但对于使用其他像svn或者Mercurial这样的版本控制系统(VCS)很熟悉,那么你完全可以基于其他的版本控制系统来完成我们的教程。

由于代码长度和单元测试的限制,我简化了测试样例搜索的机制。我们将运行代码库中tests目录下的测试样例。

通常来讲,持续集成系统监听的应该是远程代码托管库的变化。但是为了方便起见,在我们的示例之中,我们选择监听本地的代码库文件来代替远程文件。

持续集成系统并不一定按照固定的时间表运行。你也可以设定成每一次或几次提交时自动运行。在我们的例子中,我们将CI设定为定期运行。也就是说,如果我们设定CI系统设定为5秒钟运行一次,那么每隔5秒系统就会对5秒内最近的一次的提交进行测试。注意,不论这5秒内发生了多少次提交,系统都只会对最后一次提交的结果进行一次测试。

CI系统旨在监听代码库中的变化。在实际中使用的CI系统可以通过代码库的通知来获取提交信息。例如,Github提供的“post-commit hooks”。它向指定的URL发送通知, 监听这个URL的Web服务器将调用CI系统的“代码库观测器”来响应该通知。但是这种模型在我们本地的试验环境中太过复杂了,所以我们使用了观察者模型。在这种模型中系统会主动检测代码变化,而不需要等待代码管理库的通知。

为了可以查看测试运行器所提交的测试结果,CI系统还需要一个报告组件(比如一个网页),来展示测试报告。为简单起见,本项目直接将测试结果以文件的形式存储在调度程序运行的机器本地。

注意,CI系统的框架有非常多种,在我们的项目中,只是讨论了众多CI系统框架中的一种。在这种框架中,我们将我们的项目简化成了三个主要组成部分。

引言

最基础的持续集成系统分为三个部分:监听器,测试样例调度器,和测试运行器首先监听器会时刻监视代码库的变化。当发生提交时,监听器会通知调度器。之后,样例调度器会将对应提交版本号的测试分配到测试运行器中去完成

这三部分的组合方式有很多。我们可以将他们全部运行在一台电脑的同一个线程之中。但是这样一来,我们的CI系统就会缺少了处理大负载的能力,当提交带来的测试比CI系统能够同时完成的测试还要多时,就会非常容易引起任务的积压。同时这种方案的容错率非常低,一旦运行该系统的计算机发生故障或是断电,没有回退系统,中断的测试就没有办法重新运行了。理想情况下,我们的CI系统应该根据需求尽可能同时的完成多项测试工作,并且在机器发生意外停机时可以尽可能的补救。

要构建一个负载能力强并且容错率又高的CI系统,就需要保证项目中上述的每一个组件都能以独立的进程运行。各部分都独立于其他的部分,并且每一部分可以同时运行多个实例。当有很多测试任务需要同时展开时,这种方案会带来非常大的便利。我们可以并行生成多个测试运行器,每个测试运行器独立工作,这样就可以有效的避免测试队列积压的问题。

在本项目中这些组件虽然是相互独立的运行在单独的线程上,但是线程之间可以通过套接字进行通信,这样我们就可以在一个网络内的不同主机上单独运行这些进程。我们会为每一个进程分配一个地址/端口,这样每个进程之间就可以通过向各自的地址发送消息来互相通信。

通过分布式的架构,我们可以做到在硬件发生错误动态的进行处理。我们可以把监听器,测样例调度器,和测试运行器分别运行在不同的机器上,他们可以通过网络保持相互通信。当他们之中的任何一个发生问题时,我们可以安排一台新的主机上线运行发生问题的进程。这样一来这个系统就会有非常高的容错率。

在本项目中,并没有包含自动恢复任务的代码。自动恢复的功能依赖于你使用的分布式系统的架构。在实际的使用中,CI系统通常运行在支持故障转移冗余(即,当分布式系统中的一个机器发生故障,任务会自动回退到该机型的备用机上)的分布式系统之中。

为了方便测试我们的系统,在本项目中我们将会在本地手动的触发一些进程来模拟分布式的环境。

项目的文件结构

项目中每个组件的Python文件结构如下:代码库监听器( repo_observer.py ),测试样例调度器( dispatcher.py )和测试运行器( test_runner.py )。上述每个线程之间通过套接字通信,我们将用于实现通信功能的代码统一的放在 helpers.py 中。这样就可以让每个组件直接从这个文件中导入相关功能,而不用再每个组件中重复的写这段代码。

另外,我们还用到了bash脚本。这些脚本用来执行一些简单的bash和git的操作,直接通过bash脚本要比利用Python提供的系统级别的模块(比如,os或者subprocess之类的)要更方便一些。

最后,我们还建立了一个tests目录来存放我们需要CI系统运行的测试样例。在这个目录中包含两个用于测试的样例,其中一个样例模拟了样例通过时的情况,另一个则模拟了失败时的情况。

初始设置

虽然我们的CI系统是为分布式的运行而设计的,但我们会先在一台计算机上本地运行所有内容,这样我们就可以专注于掌握CI系统的工作原理,而不必受可能出现的各种网络问题的干扰。当然,如果你想要试一试分布式的运行环境,你也可以将每一个组件分别运行到不同的主机上。

持续集成系统通过监听代码的变动来触发测试,所以在开始之前我们需要设置一个用于监听的代码库。

我们称这个用于测试的项目为test_repo

  1. $ mkdir test_repo
  2. $ cd test_repo
  3. $ git init

这将是我们的代码库。开发人员的代码会提交到这个代码库中,我们的CI系统会拉取此代码库的代码并检查提交,然后运行测试。在我们的系统中代码库监听器模块会完成监听新的代码提交的工作。

监听器模块通过检查commit(提交)来进行代码更新的监听,所以我们至少需要一次的commit才能进行监听器模块的测试。让我们把测试样例的代码先提交到代码库中,以便我们执行这些测试。

将tests文件夹复制到test_repo中,然后提交:

  1. $ cp -r /this/directory/tests /path/to/test_repo/
  2. $ cd /path/to/test\_repo
  3. $ git add tests/
  4. $ git commit -m add tests

现在,在我们测试用的代码仓库中的master分支上有了一次可以用来测试的提交。

监听器模块需要一份单独的代码副本来检测新的提交。让我们为主代码库创建一个副本,并将其命名为test_repo_clone_obs

  1. $ git clone /path/to/test_repo test_repo_clone_obs

测试运行器也需要一份自己的代码副本来提取用于测试样例代码。让我们再创建另一个主代码库的副本,并将其命名为test_repo_clone_runner

  1. $ git clone /path/to/test_repo test_repo_clone_runner

组件

代码库监听器( repo_observer.py

监听器的任务是监听代码库中的改动,并在发现改动是通知测试样例分配器。为了保证我们的CI系统与各种版本控制系统(并不是所有的VCS都有内置的通知系统)都能够兼容,我们设定CI系统定时检查代码库是否有新的提交,而不是等待VCS在代码提交时发送通知。

监听器会定时轮询代码库,当有新的提交时,监听器会向分配器推送需要运行测试的代码的版本ID。监听器的轮询过程是:首先,在监听器的储存空间中得到当前的提交版本;其次,将本地库更新至这个版本;最后,将这个版本与远程库最近一次的提交ID进行比对。这样,监听器中本地的当前版本与远程的最新版本不一致时就判定为发生了新的提交。在我们的CI系统中,监听器只会向分配器推送最近的一次提交。这意味着,如果在一次的轮询周期内发生了两次提交,监听器只会为最近的一次运行测试。通常来讲,CI系统会为自上一次更新以来的每一次的提交运行相应的测试。但是为了简单起见,这次我们搭建的CI系统采取了仅为最后一次提交运行测试的方案。

监听器必须清楚自己监听的到底是哪一个代码库,我们之前已经在/path/to/test_repo_clone_obs建立了一份用于监听的代码拷贝。我们的监听器会使用这份拷贝进行检测。为了监听器能够找到这份拷贝,我们在调用repo_observer.py时会传入这一份代码拷贝的路径。监听器会以这份拷贝为本地代码库从主仓库中拉取最新的代码。

监听器会利用这份拷贝从主仓库中拉取最新的代码。在运行监听器时,可以通过命令行参数--dispatcher-server来传递分配器的地址。如果不手动传入地址,分配器的默认地址取值为:localhost:8888

  1. def poll():
  2. parser = argparse.ArgumentParser()
  3. parser.add_argument("--dispatcher-server",
  4. help="dispatcher host:port, " \
  5. "by default it uses localhost:8888",
  6. default="localhost:8888",
  7. action="store")
  8. parser.add_argument("repo", metavar="REPO", type=str,
  9. help="path to the repository this will observe")
  10. args = parser.parse_args()
  11. dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

当运行监听器脚本时,会直接从poll() 函数开始运行。这个函数会解析命令行的参数,并开始一个无限循环。这个while循环会定期的检查代码库的变化。这个循环中所做的第一个工作就是运行Bash脚本update_repo.sh 1

  1. while True:
  2. try:
  3. # call the bash script that will update the repo and check
  4. # for changes. If there's a change, it will drop a .commit\_id file
  5. # with the latest commit in the current working directory
  6. subprocess.check\_output(["./update\_repo.sh", args.repo])
  7. except subprocess.CalledProcessError as e:
  8. raise Exception("Could not update and check repository. " \+
  9. "Reason: %s" % e.output)

update_repo.sh用于识别新的提交并通知监听器。它首先记录当前所在的提交ID,然后拉取最新的代码,接着检查最新的提交ID。如果当前的版本ID与最新的匹配,说明代码没有变动,所以监听器不会作出任何响应。但是,如果提交ID间存在不同,就意味着有新了新的提交。这时,update_repo.sh将创建一个名为.commit_id的文件来记录最新的提交ID。

update_repo.sh的细分步骤如下:首先,我们的脚本会执行run_or_fail.sh文件,该文件提供了一些基于shell脚本的辅助函数。通过这些函数我们可以运行指定的脚本并可以在运行出错时输出错误信息。

!/bin/bash

source run_or_fail.sh

接下来,我们的脚本会试图删除.commit_id文件。 因为repo_observer.py会再无限循环中不断的调用updaterepo.sh ,如果在上一次的调用中产生了.commit_id 文件,并且其中储存的版本ID我们在上一次轮询中已经完成了测试,就会造成混乱。所以我们在每次都会先删除上一次的.commit_id文件,以免产生混乱。

bash rm -f .commit_id

在删除了文件之后(在文件已经存在的情况下),脚本会检查我们监听的代码库是否存在,再把.commit_id更新到最近的一次提交,保证.commit_id文件与代码库提交ID之间的同步。

run_or_fail "Repository folder not found!" pushd $1 1> /dev/null run_or_fail "Could not reset git" git reset --hard HEAD

再之后,读取git的日志,将其中最后一次的提交ID解析出来。

COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1) if [ $? != 0 ]; then echo "Could not call 'git log' on repository" exit 1 fi COMMIT_ID=echo $COMMIT | awk '{ print $2 }'

接下来,拉取代码库,获取最近所有的更改,并得到最新的提交ID。

run_or_fail "Could not pull from repository" git pull COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1) if [ $? != 0 ]; then echo "Could not call 'git log' on repository" exit 1 fi NEW_COMMIT_ID=echo $COMMIT | awk '{ print $2 }'

最后,如果新得到的提交ID与上一次的ID不匹配,我们就知道在两次轮询间发生了新的提交,所以我们的脚本应该将新的提交ID储存在.commit_id文件中。

if the id changed, then write it to a file

if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then popd 1> /dev/null echo $NEW_COMMIT_ID > .commit_id fi

当repo_observer.py运行update_repo.sh完之后,监听器会检查.commit_id是否存在。如果文件存在,我们就知道在上一次的轮询后又发生了新的提交,我们需要通知测试样例调度器来开始测试。监听器会通过连接并发送一个'status'请求来检查调度器服务的运行状态,以保证它处在可以正常接受指令的状态正常工作状态。

  1. if os.path.isfile(".commit\_id"):
  2. try:
  3. response = helpers.communicate(dispatcher\_host,
  4. int(dispatcher\_port),
  5. "status")
  6. except socket.error as e:
  7. raise Exception("Could not communicate with dispatcher server: %s" % e)

如果它以“OK”响应,则监听器会打开.commit_id文件,读取最新的提交ID并使用dispatch:请求将该ID发送给调度程序,监听器会每个5秒发送一次指令。如果发生任何错误,监听器同样会每隔5s进行一次重试。

  1. if response == "OK":
  2. commit = ""
  3. with open(".commit\_id", "r") as f:
  4. commit = f.readline()
  5. response = helpers.communicate(dispatcher\_host,
  6. int(dispatcher\_port),
  7. "dispatch:%s" % commit)
  8. if response != "OK":
  9. raise Exception("Could not dispatch the test: %s" %
  10. response)
  11. print "dispatched!"
  12. else:
  13. raise Exception("Could not dispatch the test: %s" %
  14. response)
  15. time.sleep(5)

如果你不使用KeyboardInterrupt (Ctrl + c)终止监听器发送进程或发送终止信号,监听器会永远重复这一操作。

测试样例分配器( dispatcher.py )

测试样例分配器是一个用来为测试运行器分配测试任务的独立进程。他会在一个指定端口监听来自代码库监听器及测试运行器的请求。分配器允许测试运行器主动注册,当监听器发送一个提交ID时,它会将测试工作分配给一个已经注册的测试运行器。同时,它还可以平稳的处理测试运行器遇到的各种问题,当一个运行器发生故障,它可以立即将该运行器运行测试的提交ID重新分配给一个新的测试运行器。

dispatch.py 脚本以serve函数为入口。首先,它会解析你设定的分配器的地址及端口:

def serve(): parser = argparse.ArgumentParser() parser.add_argument("--host", help="dispatcher's host, by default it uses localhost", default="localhost", action="store") parser.add_argument("--port", help="dispatcher's port, by default it uses 8888", default=8888, action="store") args = parser.parse_args()

这里我们会开启分配器进程以及一个runner_checker函数进程,和一个redistribute函数进程。

  1. server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
  2. print `serving on %s:%s` % (args.host, int(args.port))
  3. ...
  4. runner\_heartbeat = threading.Thread(target=runner\_checker, args=(server,))
  5. redistributor = threading.Thread(target=redistribute, args=(server,))
  6. try:
  7. runner\_heartbeat.start()
  8. redistributor.start()
  9. # Activate the server; this will keep running until you
  10. # interrupt the program with Ctrl\+C or Cmd\+C
  11. server.serve\_forever()
  12. except (KeyboardInterrupt, Exception):
  13. # if any exception occurs, kill the thread
  14. server.dead = True
  15. runner\_heartbeat.join()
  16. redistributor.join()

runner_checker函数会定期的ping每一个注册在位的运行器,来确保他们都处于正常工作的状态。。如果有运行器没有响应,该函数就会将其从注册的运行器池中删除,并且之前分配给他的提交ID会被重新分配给一个新的可用的运行器。函数会在pending_commits变量中记录运行受到运行器失去响应影响的提交ID

  1. def runner\_checker(server):
  2. def manage\_commit\_lists(runner):
  3. for commit, assigned\_runner in server.dispatched\_commits.iteritems():
  4. if assigned\_runner == runner:
  5. del server.dispatched\_commits[commit]
  6. server.pending\_commits.append(commit)
  7. break
  8. server.runners.remove(runner)
  9. while not server.dead:
  10. time.sleep(1)
  11. for runner in server.runners:
  12. s = socket.socket(socket.AF\_INET, socket.SOCK\_STREAM)
  13. try:
  14. response = helpers.communicate(runner["host"],
  15. int(runner["port"]),
  16. "ping")
  17. if response != "pong":
  18. print "removing runner %s" % runner
  19. manage\_commit\_lists(runner)
  20. except socket.error as e:
  21. manage\_commit\_lists(runner)

redistribute函数用来将pending_commits中记录的提交ID进行分发。当redistribute运行时,它会不断的检查pending_commits中是否有提交ID。一旦发现pending_commits中存在提交ID,函数会调用dispatch_tests方法来将提交ID分配出去。

  1. def redistribute(server):
  2. while not server.dead:
  3. for commit in server.pending\_commits:
  4. print "running redistribute"
  5. print server.pending\_commits
  6. dispatch\_tests(server, commit)
  7. time.sleep(5)

dispatch_tests函数用来从已注册的运行器池中返回一个可用的运行器。如果得到了一个可用的运行器,函数会发送一个带有提交ID的运行测试指令。如果当前没有可用的运行器,函数会在2s的休眠之后重复上述过程。如果分配成功了,函数会在dispatched_commits变量中记录提交ID及该提交ID的测试正在由哪一个运行器运行。如果提交ID在pending_commits变量中,dispatch_tests函数会在重新分配后将提交ID从pending_commits中删除。

def dispatch_tests(server, commit_id):

  1. # NOTE: usually we don't run this forever
  2. while True:
  3. print "trying to dispatch to runners"
  4. for runner in server.runners:
  5. response = helpers.communicate(runner["host"],
  6. int(runner["port"]),
  7. "runtest:%s" % commit\_id)
  8. if response == "OK":
  9. print "adding id %s" % commit\_id
  10. server.dispatched\_commits[commit\_id] = runner
  11. if commit\_id in server.pending\_commits:
  12. server.pending\_commits.remove(commit\_id)
  13. return
  14. time.sleep(2)

分配器服务用到了标准库中的一个叫SocketServer模块,它是一个非常简单的服务器。SocketServer模块中有四种基本的服务器类型: TCP , UDP , UnixStreamServer和UnixDatagramServer 。为了保证我们的数据传输连续稳定,我们使用基于TCP协议的套接字(UPD并不能保证数据的稳定和连续)。

SocketServer中提供的默认的TCPServer同时只能处理一个请求,因此当分配器与一个运行器建立会话后,就无法再同时与监听器建立连接了。此时来自监听器的会话只能等待第一个会话完成并断开连接才能建立与分配器的连接。这对于我们的项目而言并不是非常理想,在我们预想中,分配器应该直接而迅速的同时与所有运行器及监听器进行通信。

为了使我们的分配器可以同时维护多个连接,我们使用了一个自定义的类ThreadingTCPServer来为默认的SocketServer 类增加多线程运行的功能。也就是说无论何时分配器接收到连接请求,他都会新建一个进程来处理这个会话。这就使分配器同时维护多个连接成为了可能。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): runners = [] # Keeps track of test runner pool dead = False # Indicate to other threads that we are no longer running dispatched_commits = {} # Keeps track of commits we dispatched pending_commits = [] # Keeps track of commits we have yet to dispatch

分配器会为每一个请求定义了处理函数。我们通过DispatcherHandler类定义来实现这种功能。DispatcherHandler继承自SocketServer的BaseRequestHandler 基类。这个基类只需要我们定义句柄函数,只要请求连接它就会被调用。我们将这个函数的自定义内容写在DispatcherHandler中,并且确保在每一次请求出现是,这个函数能够被调用。这个函数会不断地监听发来的请求( self.request中保存着请求信息),并解析请求中的指令。

class DispatcherHandler(SocketServer.BaseRequestHandler): """ The RequestHandler class for our dispatcher. This will dispatch test runners against the incoming commit and handle their requests and test results """ command_re = re.compile(r"(\w+)(:.+)*") BUF_SIZE = 1024 def handle(self): self.data = self.request.recv(self.BUF_SIZE).strip() command_groups = self.command_re.match(self.data) if not command_groups: self.request.sendall("Invalid command") return command = command_groups.group(1)

这个函数可以处理如下指令: status , register , dispatch和results 。其中 status函数用来检测分配器服务是否处于运行状态。

  1. if command == "status":
  2. print "in status"
  3. self.request.sendall("OK")

为了让分配器的功能生效,我们需要注册至少一个运行器。当注册器被调用时,为了确保在需要发送提交ID触发测试时能准确的找到对应的运行器,会在一个列表中保存下运行器的“地址:端口”数据(运行器的数据会被保存在一个叫ThreadingTCPServer的对象中)。

  1. elif command == "register":
  2. # Add this test runner to our pool
  3. print "register"
  4. address = command\_groups.group(2)
  5. host, port = re.findall(r":(\w\*)", address)
  6. runner = {"host": host, "port":port}
  7. self.server.runners.append(runner)
  8. self.request.sendall("OK")

dispatch指令用于代码库监听器下发测试的提交ID给测试运行器。此命令的用法为dispatch: 。分配器从此消息中解析出提交ID并将其发送给测试运行器。

  1. elif command == "dispatch":
  2. print "going to dispatch"
  3. commit\_id = command\_groups.group(2)[1:]
  4. if not self.server.runners:
  5. self.request.sendall("No runners are registered")
  6. else:
  7. # The coordinator can trust us to dispatch the test
  8. self.request.sendall("OK")
  9. dispatch\_tests(self.server, commit\_id)

results指令会由测试运行器在上报测试结果是调用。此命令的用法为results:::用于标识测试报告对应的提交ID。用于计算结果数据使用需要多大的缓冲区。最后, 中是实际报告信息。

  1. elif command == "results":
  2. print "got test results"
  3. results = command\_groups.group(2)[1:]
  4. results = results.split(":")
  5. commit\_id = results[0]
  6. length\_msg = int(results[1])
  7. # 3 is the number of ":" in the sent command
  8. remaining\_buffer = self.BUF\_SIZE \- \
  9. (len(command) \+ len(commit\_id) \+ len(results[1]) \+ 3)
  10. if length\_msg > remaining\_buffer:
  11. self.data \+= self.request.recv(length\_msg \- remaining\_buffer).strip()
  12. del self.server.dispatched\_commits[commit\_id]
  13. if not os.path.exists("test\_results"):
  14. os.makedirs("test\_results")
  15. with open("test\_results/%s" % commit\_id, "w") as f:
  16. data = self.data.split(":")[3:]
  17. data = "\n".join(data)
  18. f.write(data)
  19. self.request.sendall("OK")

测试运行器( test_runner.py )

测试运行器负责对给定的提交ID运行测试,并上报测试结果。它仅会与分配器通信,分配器负责为其提供需要运行测试的提交ID,并且会接收测试结果报告。

test_runner.py文件会以启动测试运行器服务的serve函数为入口,并启动一个线程来运行dispatcher_checker函数。由于此启动过程与repo_observer.py和dispatcher.py的启动过程非常相似,因此我们在这里就不再赘述。

dispatcher_checker函数每五秒对分配器执行一次ping操作,以确保它仍然在正常运行。这个操作主要是出于资源管理上的考虑。如果对应的分配器挂了,就需要将测试运行器也关闭。 否则测试运行器就只能空跑,及接收不到新的任务也无法提交之前任务产生报告。

  1. def dispatcher\_checker(server):
  2. while not server.dead:
  3. time.sleep(5)
  4. if (time.time() \- server.last\_communication) > 10:
  5. try:
  6. response = helpers.communicate(
  7. server.dispatcher\_server["host"],
  8. int(server.dispatcher\_server["port"]),
  9. "status")
  10. if response != "OK":
  11. print "Dispatcher is no longer functional"
  12. server.shutdown()
  13. return
  14. except socket.error as e:
  15. print "Can't communicate with dispatcher: %s" % e
  16. server.shutdown()
  17. return

测试运行器的服务于分配器相同都是ThreadingTCPServer ,它需要多线程运行的是因为分配器既会向它下发提交ID,也可能在测试运行的期间的ping它。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): dispatcher_server = None # Holds the dispatcher server host/port information last_communication = None # Keeps track of last communication from dispatcher busy = False # Status flag dead = False # Status flag

整个通信流是从分配器向测试运行器发送需要运行测试的提交ID开始的。如果测试运行器的状态可以运行测试,它会发送确认消息回分配器,然后关闭第一个连接。为了使测试运行器在跑测试的同时还能接受来自分配器的其他请求,它会单独启动一个进程来运行测试。

这样,当分配器在测试运行器正在运行测试的时候发来一个请求(比如一个ping请求), 测试运行器的测试跑在另一个进程上,运行器服务本身仍然可以在作出响应。这样测试运行器就可支持同时运行多个任务了。还有一种替代多线程运行的设计是在分配器与测试运行器间建立一个长连接。 但这样会在分配器端消耗大量的内存来维持连接,另外这种方式还对网络有强依赖。 如果网络一旦产生波动(比如突然的断线)就会对系统造成破坏。

测试运行器会从分配器接收到两种消息。 第一种是ping消息 ,分配器用这个消息来验证测试运行器是否仍处于活跃状态。

class TestHandler(SocketServer.BaseRequestHandler): …

  1. def handle(self):
  2. ....
  3. if command == "ping":
  4. print "pinged"
  5. self.server.last\_communication = time.time()
  6. self.request.sendall("pong")

另一个是 runtest ,它的格式是runtest: 。这条指令用于分配器下发需要测试的提交ID。当接收到runtest时,测试运行器将检查当前是否有正在运行的测试。如果有,它会给分配器返回BUSY的响应。如果没有,它会返回OK,将其状态设置为busy并运行其run_tests函数。

  1. elif command == "runtest":
  2. print "got runtest command: am I busy? %s" % self.server.busy
  3. if self.server.busy:
  4. self.request.sendall("BUSY")
  5. else:
  6. self.request.sendall("OK")
  7. print "running"
  8. commit\_id = command\_groups.group(2)[1:]
  9. self.server.busy = True
  10. self.run\_tests(commit\_id,
  11. self.server.repo\_folder)
  12. self.server.busy = False

这个函数会调用一个叫test_runner_script.sh 的shell脚本,该脚本会将代码更新为给定的提交ID。脚本返回后,如果代码库已经被成功的更新,运行器会使用unittest运行测试并将结果收集到一个文件中。测试运行完毕后,测试运行器将读入结果报告文件,并将报告发送给调度程序。

  1. def run\_tests(self, commit\_id, repo\_folder):
  2. # update repo
  3. output = subprocess.check\_output(["./test\_runner\_script.sh",
  4. repo\_folder, commit\_id])
  5. print output
  6. # run the tests
  7. test\_folder = os.path.join(repo\_folder, "tests")
  8. suite = unittest.TestLoader().discover(test\_folder)
  9. result\_file = open("results", "w")
  10. unittest.TextTestRunner(result\_file).run(suite)
  11. result\_file.close()
  12. result\_file = open("results", "r")
  13. # give the dispatcher the results
  14. output = result\_file.read()
  15. helpers.communicate(self.server.dispatcher\_server["host"],
  16. int(self.server.dispatcher\_server["port"]),
  17. "results:%s:%s:%s" % (commit\_id, len(output), output))

这是test_runner_script.sh的内容 :

!/bin/bash

REPO=$1 COMMIT=$2 source run_or_fail.sh run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null run_or_fail "Could not clean repository" git clean -d -f -x run_or_fail "Could not call git pull" git pull run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

要运行test_runner.py ,必须将其指向存储库的副本。你可以使用我们先前创建的/path/to/test_repo test_repo_clone_runner 副本作为启动参数。默认情况下, test_runner.py将在localhost的8900-9000端口上启动,并尝试连接到localhost:8888上的调度程序服务器。你通过可以一些可选参数来更改这些值。--host和--port参数用于指定运行测试运行器服务器地址和端口, --dispatcher-server参数指定调度程序的地址。

控制流程图

图2.1是该系统的概述图。图中假设所有三个文件( repo_observer.py , dispatcher.py和test_runner.py )都已在运行,并描述了每个进程在新的提交发生时所采取的操作。

图2.1 - 控制流程

运行代码

我们可以在本地运行这个简单的CI系统,为每个进程使用不同的终端shell。我们首先启动分配器,它默认运行在端口8888上:

$ python dispatcher.py

开一个新的的shell,我们启动测试运行器(这样它就可以在分配器中注册了):

$ python test_runner.py

测试运行器将自动为自己分配端口,范围为8900-9000。你可以根据需求尽可能多起几个测试运行器。

最后,在另一个新shell中,让我们启动代码库监听器:

$ python repo_observer.py --dispatcher-server=localhost:8888

现在万事俱备,让我们触发一些测试吧玩一下吧!根据设计我们需要创建一个新的提交来触发测试。切换到你的主代码仓库中, 随便改点什么:

$ cd /path/to/test_repo $ touch new_file $ git add new_file $ git commit -m"new file" new_file

然后repo_observer.py识别到有一个新的提交产生了,之后通知分配器。你可以在它们各自的shell窗口中查看它们的运行日志。当分配器收到测试结果,它就会将它们存储在此代码库中的test_results/文件夹中,并使用提交ID作为文件名。

错误处理

该CI系统中包括一些简单的错误处理。

如果你讲test_runner.py进程杀掉, dispatcher.py将确定该运行器将会识别出这个节点已经不再活跃,并将其从运行器池中移除。

你也可以模拟网络或系统故障,在测试运行器执行测试的时候将它杀死。这时,分配器会识别到运行器已经挂了,它会将挂掉的运行器从运行器池中移除,并将这个运行器之前在执行的任务分配给池中其他的运行器。

如果你杀掉分配器,那么监听器会直接报错。测试运行器也会发现分配器不再运行,并自动关闭。

结论

通过逐个分析各个进程中的不同功能,我们对构建了一个分布式的持续集成系统的有了一些基本的认识。通过套接字请求实现进程间的通信,我们的CI系统可以分布式的运行在不同的机器上,这增强了我们的系统可靠性和可扩展性。

这套CI系统现在的功能仍然非常简单,你还可以发挥自己的才能对它进行各种扩展以实现更多功能。以下是一些改进建议:

对每次提交自动运行测试

当前系统将定期检查是否有新的提交并对最近的一次提交运行测试。这个设计可以改为每次提交都触发测试。你可以修改定期检查程序,获取在两次轮询中发生的所有提交来实现这个功能。

更智能的运行器

如果测试运行器检测到分配器没有响应,则它将停止运行。当测试运行器正在运行测试时,也会立即关闭!如果测试运行器可以有一段时的等待期或者长期运行(如果你并不在乎它对资源的占用)来等待分配器恢复可能会更好一些。这样当分配器恢复时,运行器既可以将之前执行的测试的报告重新发回分配器。这样可以避免因分配器故障而引起的重复任务,在对每一个提交都执行测试时,这将很大程度上节约运行器资源。

报告展示

在真正的CI系统中,测试报告一般会发送到一个单独的报告服务。在报告系统中,人们可以查看报告详情,或是设置一些通知规则,在遇到故障或其他一些特殊的情况下通知相关人员。你可以为我的CI系统创建一个独立的报告进程,替换掉分配器的报告收集功能。这个新的进程可以是一个Web服务(或链接到一个Web服务上), 这样我们就可以在网页上直接在线查看测试报告,甚至可以用一个邮件服务器来实现测试失败时的提醒。

测试运行器管理器

在当前的系统中,我们必须手动运行test_runner.py文件来启动测试运行器。你可以创建一个测试运行器管理器进程,通过这个进程来管理查看所有运行器上的负载和来自分配器的请求,对运行器的数量进行相应的调整。这个进程会接受所有的测试任务,根据任务启动测试运行器,并在任务少的时候减少运行器的实例。

遵循这些建议,你可以使这个简单的CI系统更加健壮并且容错率更高,并且具有与其他系统(比如一个网页版的报告查看器)集成的能力。

如果你希望了解现在的持续集成系统可以实现到什么样的灵活性,我建议你去看看Jenkins ,这是一个用Java编写的非常强大的开源CI系统。它提供了一个基本的CI系统,同时也允许使用插件进行扩展。你可以通过GitHub访问其源代码。另一个推荐的项目是Travis CI ,它是用Ruby编写的,其源代码也可以通过GitHub 得到。

这是了解CI系统如何工作以及如何自己构建CI系统的尝试。现在你应该对制作一个可靠的分布式系统所需的内容有了更深入的了解,希望你可以利用这些知识开发更复杂的解决方案。

使用Bash是因为我们需要 检查文件存在,创建文件和使用Git,而shell脚本是实现这一目标的最直接,最简单的方法。当然,你也可以使用一些跨平台的Python包;例如,用的Python内置的os模块访问文件系统,用GitPython访问Git,但是用这些包实现起来稍微有一点不直观。↩

  1. </path></path></commit></results></length></commit></results></length></commit></commit></commit></code>