継続的インテグレーションシステム

継続的インテグレーションシステムとは何か?

ソフトウェアの開発では、新しい機能やバグフィックスが期待通りに動作し新たなバグを生まないことの検証が求められる。この検証はコードに対するテストを実行することで行われる。開発者自身がローカルでテストを実行して自身の変更がバグを生まないことを検証する場合もあるものの、開発しているソフトウェアが実行される全てのシステムでコードをテストする時間を開発者が持たない可能性がある。さらに、テストが増えればそれだけテストの実行時間も増えるので、ローカルでテストを実行することは現実的でなくなる。この問題に対処するために継続的インテグレーションシステムが作成されてきた。

継続的インテグレーション (continuous integration, CI) システムは新しいコードのテストに特化した専用システムである。コードレポジトリの変更がコミットされたとき、そのコミットがテストを破壊しないことの検証が CI システムの責務となる。これを行うために、CI システムは新しい変更をフェッチし、テストを実行し、その結果を報告できる必要がある。「システム」と名の付く他の仕組みと同様に、CI システムは障害に強くなければならない。これはシステムの一部で障害が起きた場合でもシステム全体は終了せず、障害が起きた地点から処理をやり直す機能を持つことを意味する。

CI システムは高い負荷を適切に扱える必要もある。つまり、テストの実行時間よりも短い間隔でコミットが行われる状況であっても、テストの結果報告が非常に遅くなる事態が起きてはならない。本章で紹介する CI システムでは、テストの実行を分散・並列化することでこれを達成する。本プロジェクトが示すのは拡張性を念頭に設計された必要最低限の機能を持つ小さな CI システムである。

プロジェクトの制約と注意事項

本プロジェクトはテストされるコードのレポジトリが Git で管理されると仮定する。利用する Git コマンドはソースコード管理に関する標準的なものだけなので、svn や Mercurial といった Git 以外のバージョン管理システム (VCS) しか知らなかったとしても本章を読むことはできる。

コードの長さとユニットテストの制約があるので、テストを見つける処理は単純化されている。具体的には、レポジトリの tests ディレクトリにあるテストだけが実行される。

一般的な CI システムが監視するマスターレポジトリはウェブサーバーでホストされる場合が多く、通常はローカル (CI と同じファイルシステム上) にマスターレポジトリは存在しない。しかし、本章で説明する CI システムはリモートレポジトリではなくローカルレポジトリをマスターとして使用する。

一般的な CI システムは固定された定期的な実行スケジュールを持つ必要はなく、数コミットごとまたは各コミットごとにテストを実行するように設定できる。ただ、本章で紹介する CI システムは定期的に実行される。具体的には、変更を 5 秒おきに確認し、前回の確認から現在までの間に行われたコミットの中で最新のものに対してテストを実行する。直近の 5 秒間に行われた全てのコミットをテストするわけではなく、その中で最新のものだけがテストされる。

本章で紹介する CI システムはレポジトリの変更を定期的に確認する。現実世界の CI システムでは、ホストされたレポジトリから通知を受け取るレポジトリオブザーバーの仕組みを利用できる場合がある。例えば Github には「Webhook」と呼ばれる仕組みがあり、コミットがあったときに指定された URL に通知を送信するようにできる。Webhook を使うとき、レポジトリオブザーバーは特定の URL でホストされるウェブサーバーで実行され、受け取った通知に応答する。この仕組みはローカルで再現するには複雑なので、本プロジェクトではレポジトリオブザーバーが通知を待つのではなく変更を能動的に確認するモデルを用いる。

一般的な CI システムはテストの結果を人間が読める形で (例えばウェブページとして) 出力する機能も持つ。実装を単純にするため、本プロジェクトではディスパッチャプロセスがテストの結果を収集し、ローカルファイルシステムに結果を保存する。

本章で紹介する CI システムは可能な CI システムの一つに過ぎない点に注意してほしい。以上のアプローチは三つの主要なコンポーネントのケーススタディを単純化するために選択された。

導入

CI システムは基本的に三つのコンポーネントから構成される: オブザーバー、テストジョブディスパッチャ、そしてテストランナーである。オブザーバーはレポジトリを監視し、新しいコミットを検出するとディスパッチャに通知する。ディスパッチャは手の空いているランナーを見つけ、そのランナーにテストすべきコミット番号を伝える。

CI システムのアーキテクチャには様々な選択肢がある。オブザーバー、ディスパッチャ、ランナーを同じマシンの同じプロセスで実行することもできる。しかし、このアプローチは高い負荷を処理できない: CI システムの処理が追い付かない速度でコミットが行われると、未実行のテストが積み上がっていく。また、このアプローチは障害にも弱い: CI システムを実行するマシンで障害や電源喪失が起こると、フォールバックシステムが存在しないのでテストが実行されなくなる。理想的なシステムはテストジョブがどんなに多くても適切に処理でき、マシンが落ちた場合でも最善の補償を提供するよう試みる。

障害と高負荷に耐える CI システムを構築するため、本プロジェクトでは三つのコンポーネントがそれぞれ個別のプロセスを持つ。こうすると各プロセスが独立し、さらに各コンポーネントのインスタンスを複数起動することも可能になる。これは実行しなければならないテストが同時に二つ以上存在するとき有用となる。テストランナーを実行するプロセスはいくつでも生成できるので、望む限りのジョブを並列に実行できる。未処理のテストが積み上がっていくことはない。

本プロジェクトでは、各コンポーネントが個別のプロセスとして実行されるだけではなく、コンポーネントを実行するプロセス間の通信がソケットを通して行われる。そのため、ネットワークで接続された異なるマシン上のプロセスでコンポーネントを実行できる。ホスト名とポートからなる一意なアドレスがそれぞれのコンポーネントに割り当てられ、そのアドレスにメッセージを送ることで通信が行われる。

この設計によって分散されたアーキテクチャが可能になり、ハードウェア障害が自然に対処可能になる。オブザーバー、ディスパッチャ、ランナーをそれぞれ異なるマシンで実行してネットワークを通じて通信するようにすれば、いずれかのマシンで障害が発生した場合でも、新しいマシンをネットワークに接続して、そのマシン上のプロセスで停止したコンポーネントを実行すれば問題は起こらない。つまりシステムはフェイルセーフとなる。

障害からの具体的な復帰方法は分散システムのアーキテクチャによって異なるので、本プロジェクトは自動復帰のコードを含まない。現実世界の CI システムは上述したような分散環境で実行されるので、フェイルオーバーを行うための冗長性を持つ (つまり、コンポーネントを実行するプロセスを持つマシンで障害が発生した場合でも待機中のマシンにフォールバックできる)。

本プロジェクトでは、各プロセスはローカルに手動で起動され、異なるローカルポートを利用する。

プロジェクトのファイル

本プロジェクトには三つのコンポーネントを実装する Python ファイルが含まれる: レポジトリオブザーバー (repo_observer.py)、テストジョブディスパッチャ (dispatcher.py)、そしてテストランナー (test_runner.py) である。これら三つのファイルは異なるプロセスで実行され、ソケットを使って通信する。情報をやり取りする処理は三つのコンポーネントで共通なので、そのコードは helpers.py にある。同じコードを各ファイルに重複させるのではなく、helpers.py にある関数を各プロセスがインポートして利用する。

各プロセスは Bash スクリプトも利用する。これらの Bash スクリプトは Bash と Git のコマンドを実行するために利用される。Python の OS レベルのモジュール (ossubprocess) よりも Bash の方が簡単にコマンドを実行できる。

最後に、tests ディレクトリには CI システムが実行するテストの例が含まれる。一つのテストは成功し、もう一つのテストは失敗する。

初期設定

この CI システムは分散システムでも実行できる。ただ、まずは単一のマシンで全てのコンポーネントを実行してシステム全体の動作を確認しよう。こうしておけばネットワーク関連の問題が発生する心配はない。もし分散環境で実行したい場合は、各コンポーネントをそれぞれ異なるマシンで実行すればよい。

CI システムはコードレポジトリの変更を検出したときにテストを実行する。そこで、CI システムが監視するレポジトリを最初に作成する。ディレクトリの名前は test_repo とする。

$ mkdir test_repo
$ cd test_repo
$ git init

これが「マスター」のレポジトリとなる: 開発者たちはこのレポジトリにコードをチェックインする。CI システムはこのレポジトリに新しいコミットがあるかどうかを確認し、もしあるならテストを実行する。マスターレポジトリに対する新しいコミットの監視はレポジトリオブザーバーの責務である。

レポジトリオブザーバーはコミットを確認することで動作するので、マスターレポジトリに少なくとも一つのコミットが必要になる。テストのコードをコミットして、実行すべきテストが存在する状態にしてみよう。本章のコードの tests ディレクトリを test_repo にコピーしてコミットする。

$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test_repo
$ git add tests/
$ git commit -m "add tests"

これでマスターレポジトリがコミットを持つ状態になった。

レポジトリオブザーバーは自身専用のコードのクローンを保持し、それとマスターレポジトリを比較することで新しいコミットを検出する。レポジトリオブザーバー用のクローンを test_repo_clone_obs に作成しよう。

$ git clone /path/to/test_repo test_repo_clone_obs

テストランナーも自身専用のコードのクローンを保持し、テストを実行すべきコミットにおけるレポジトリをチェックアウトするときに利用する。テストランナー用のクローンを test_repo_clone_runner に作成しよう。

$ git clone /path/to/test_repo test_repo_clone_runner

三つの主要コンポーネント

レポジトリオブザーバー (repo_observer.py)

レポジトリオブザーバーはレポジトリを監視し、新しいコミットを検出するとディスパッチャに通知する。全てのバージョン管理システム (VCS) を扱えるように、レポジトリオブザーバーでは監視対象のレポジトリで新しいコミットがあるかどうかを定期的に確認する設計が採用された。通知システムを組み込みで持たない VCS もあるので、VCS からレポジトリオブザーバーに新しいコミットを通知させる設計にはなっていない。

レポジトリオブザーバーはレポジトリを定期的に確認し、変更を見つけるとディスパッチャにテストすべき最新のコミット ID を伝える。変更の有無はマスターレポジトリの自分専用のクローンを更新し、コミット ID が変わったかどうかで判断される。本プロジェクトでは、最新のコミットに対するテストだけがディスパッチされる。このため、定期的な確認の間にコミットが二回あった場合でも、レポジトリオブザーバーは最新のコミットに対するテストだけを指示する。通常の CI システムは最後にテストされたコミットからの全てのコミットを検出し、全ての新しいコミットに対するテストをディスパッチする。ただ、本プロジェクトでは処理を簡単にするため変更した。

レポジトリオブザーバーは自身が監視すべきレポジトリを知る必要がある。先述の例で言えば、このレポジトリはマスターのクローン /path/to/test/repo_clone_obs である。このクローンを使えるようにするために、そのパスが repo_observer.py の起動時にコマンドライン引数として渡される。クローンのパスが分かれば、マスターレポジトリからの pull が可能になる。

レポジトリオブザーバーはディスパッチャにメッセージを送信するので、ディスパッチャのアドレスを知っておく必要もある。コマンドライン引数 --dispatcher-server を使うと起動時にディスパッチャのサーバーアドレスを指定できる。この引数を渡さないと、デフォルトのアドレス localhost:8888 が使われる。

def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " \
                        "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

レポジトリオブザーバーのファイルが起動されると poll 関数が呼び出される。この関数はコマンドライン引数をパースし、その後レポジトリの変更を定期的に確認する無限 while ループに入る。この whlie ループでは最初に update_repo.sh という Bash スクリプトが呼び出される1

    while True:
        try:
            # レポジトリを更新し、変更を確認する Bash スクリプトを実行する。
            # 変更があるなら、カレントディレクトリで最新のコミットがチェックアウトされ、
            # .commit_id という名前のファイルが作成される。
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. " +
                            "Reason: %s" % e.output)

update_repo.sh は新しいコミットの有無を確認し、もし存在するならレポジトリオブザーバーにその事実を伝える。具体的には、現在のコミット ID を記録し、git pull を実行し、最新のコミット ID を確認する。二つのコミット ID が一致するならレポジトリは変更されていないので、レポジトリオブザーバーは何もする必要がない。もし一致しないなら変更が起きているので、update_repo.sh.commit_id というファイルを作成し、そこに最新のコミット ID を書き込む。

update_repo.sh は次の処理を行う。まず、source run_or_fail.shrun_or_fail.sh というファイルを実行する。このファイルでは run_or_fail というヘルパー関数が定義される。run_or_fail は指定されたコマンドを実行し、失敗した場合は指定されたメッセージを表示する関数であり、本プロジェクトの他の Bash スクリプトでも利用される。

#!/bin/bash

source run_or_fail.sh

続いて update_repo.sh はファイル .commit_id の削除を試みる。この update_repo.shrepo_observer.py から何度も呼ばれるので、前回の update_repo.sh の実行が新しいコミットを見つけて .commit_id を作成している可能性がある。そのため最初に .commit_id を削除し、新しいコミットが見つかったときにだけ .commit_id を新しく作成する。

bash rm -f .commit_id

.commit_id を (存在するなら) 削除した後、update_repo.sh はコマンドライン引数として渡されたディレクトリが存在することを確認し、そのディレクトリにあるレポジトリに対してクローン後に行われた余計な変更を打ち消すために git reset --hard HEAD を実行する。

run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD

その後 git log を実行し、出力をパースして最新のコミット ID を取得する。

COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

そしてマスターレポジトリから pull して最新の変更を取り込み、同様の方法で最新のコミット ID を取得する。

run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

最後に、二つのコミット ID が一致しなければテストすべき新しいコミットがあるということなので、最新のコミット ID を .commit_id ファイルに保存する。

# コミット ID が一致しないなら、最新のコミット ID をファイルに保存する
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
  popd 1> /dev/null
  echo $NEW_COMMIT_ID > .commit_id
fi

repo_observer.py が呼び出した update_repo.sh が実行を終えると、レポジトリオブザーバーは .commit_id ファイルがあるかどうかをチェックする。このファイルの存在はテストすべき新しいコミットの存在を意味するので、そのコミット ID をディスパッチャに伝えてテストを開始させる必要がある。レポジトリオブザーバーは最初にディスパッチャサーバーに接続して 'status' リクエストを送信することでサーバーの状態を取得し、通知を受け取る準備が整っていることを確認する。

        if os.path.isfile(".commit_id"):
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher: %s" % e)

もしサーバーからの応答が "OK" なら、レポジトリオブザーバーは .commit_id ファイルを開いて最新のコミット ID を読み込み、ディスパッチャサーバーに dispatch:<コミット ID> メッセージを送信する。その後 5 秒間だけ処理を停止し、それから同じ処理を繰り返す。これまでに説明してきた処理で何か異常があった場合も 5 秒後に最初から同じ処理が繰り返される。

            if response == "OK":
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" % response)
                print "dispatched!"
            else:
                raise Exception("Could not dispatch the test: %s" % response)
        time.sleep(5)

レポジトリオブザーバーは以上の処理を KeyboardInterrupt (Ctrl+C) または kill シグナルでプロセスが終了させられるまで繰り返す。

ディスパッチャ (dispatcher.py)

ディスパッチャはテスト実行タスクの起動に用いられる個別のサービスである。ポートにリッスンし、テストランナーとレポジトリオブザーバーの両方からリクエストを受け付ける。テストランナーは自身を登録するとき、そしてレポジトリオブザーバーはテストを実行すべき新しいコミット ID を伝えるときにディスパッチャにリクエストを送信する (後者のリクエストを受け取ったディスパッチャは伝えられたコミット ID に対応するテストランナーをディスパッチする)。また、テストランナーの障害に対処するのもディスパッチャの責務であり、ディスパッチャは異常終了したテストランナーに割り当てたコミット ID に対するテストを再度ディスパッチする。

dispatch.py を起動すると serve 関数が呼び出される。この関数は最初にコマンドライン引数をパースする。ユーザーはコマンドライン引数を通してディスパッチャのホストとポートを指定できる。

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

その後、三つのスレッドが起動される。ディスパッチャのサーバーを実行するスレッド、runner_checker 関数を実行するスレッド、そして redistribute 関数を実行するスレッドである。

    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print "serving on %s:%s" % (args.host, int(args.port))

    ...

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # サーバーを起動する
        # Ctrl+C または Cmd+C でプログラムを kill するまで実行は続く
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # 例外が起こったときはスレッドを停止する
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()

runner_checker 関数は登録されたテストランナーに定期的な ping を送信し、テストランナーが応答可能な状態にあることを確認する。応答しないテストランナーはプールから削除され、そのランナーがテストしていたコミット ID は次に手の空いたランナーにディスパッチされる。この関数は未ディスパッチのコミット ID を引数 serverpending_commits 属性に記録する。

    def runner_checker(server):
        def manage_commit_lists(runner):
            for commit, assigned_runner in server.dispatched_commits.iteritems():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)
        while not server.dead:
            time.sleep(1)
            for runner in server.runners:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"],
                                                   int(runner["port"]),
                                                   "ping")
                    if response != "pong":
                        print "removing runner %s" % runner
                        manage_commit_lists(runner)
                except socket.error as e:
                    manage_commit_lists(runner)

redistribute 関数は server.pending_commits に含まれるコミット ID のそれぞれに対して dispatch_tests 関数を呼び出すことで、未処理のコミット ID をディスパッチする。

    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

dispatch_tests 関数は最初に登録されたテストランナーの中から手の空いているものを見つける。もし手の空いているテストランナーが見つかれば runtest:<コミット ID> メッセージを送信し、もし見つからなければ 2 秒間だけ待ってから同じ処理を繰り返す。ディスパッチが完了すると、ディスパッチされたコミット ID とそれを担当するテストランナーが server.dispatched_commits に記録される。

def dispatch_tests(server, commit_id):
    # NOTE: 通常このループはいずれ終了する
    while True:
        print "trying to dispatch to runners"
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           "runtest:%s" % commit_id)
            if response == "OK":
                print "adding id %s" % commit_id
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)

ディスパッチャのサーバー処理で利用される SocketServer モジュールは非常に単純なサーバー機能を提供する Python の標準ライブラリである。SocketServer モジュールは TCP, UDP, UnixStreamServer, UnixDatagramServer という四つの基礎的な種類のサーバーを提供する。本プロジェクトではサーバー同士が順序の保存される連続ストリームをやり取りできる必要があるので、TCP ベースのソケットサーバーが用いられる。UDP では順序や連続性が保証されない。

SocketServer が提供する TCPServer は同時に一つのリクエストしか処理できない。そのため、例えばディスパッチャがテストランナーとの接続を処理しているときにレポジトリオブザーバーが接続してきてもディスパッチャは対応できず、レポジトリオブザーバーは接続の終了まで待つ必要がある。CI システムでディスパッチャはレポジトリオブザーバーおよび全てのテストランナーと迅速に直接通信できる必要があるので、これは望ましくない。

ディスパッチャが複数の接続を同時に処理できるようにするために、標準ライブラリが提供する SocketServer にスレッディング機能を追加した ThreadingTCPServer という独自クラスが利用される。「スレッディング機能」とは、ディスパッチャが接続リクエストを受け取るたびにその接続を担当する新しいプロセスを生成することを意味する。この仕組みがあればディスパッチャは複数のリクエストを同時に処理できる。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = []            # テストランナーのプール
    dead = False            # 実行が終了したことを他のスレッドに伝える変数
    dispatched_commits = {} # ディスパッチしたコミットの ID
    pending_commits = []    # ディスパッチしていないコミットの ID

ディスパッチャサーバーの動作はそれぞれのリクエストに対するハンドラによって定義される。このハンドラが次に示す DispatcherHandler であり、このクラスは SocketServer モジュールの BaseRequestHandler を継承する。基底クラス BaseRequestHandler を使うとき、接続がリクエストされるたびに呼ばれる handle メソッドを定義するだけでハンドラが利用可能となる。DispatcherHandler クラスの handle メソッドが本 CI システム独自の処理を行うハンドラであり、接続のたびに呼び出される。この関数は新たな接続リクエスト self.request を調べ、どのコマンドがリクエストされているのかを判断する。

class DispatcherHandler(SocketServer.BaseRequestHandler):
    """
    ディスパッチャサーバーが使用する RequestHandler クラス
    レポジトリオブザーバーからのコミット ID の通知、および
    テストランナーからの登録要求とテスト結果の報告を処理する。
    """
    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024
    def handle(self):
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

この関数が処理するコマンドは status, register, dispatch, results の四つである。status コマンドはディスパッチャサーバーが実行され応答可能な状態にあることの確認に利用される。

        if command == "status":
            print "in status"
            self.request.sendall("OK")

ディスパッチャが実際に仕事をするには、少なくとも一つのテストランナーが登録されている必要がある。この登録処理は register コマンドで行われ、register コマンドを受け取ったディスパッチャはテストランナーのホストとポートをリストに保存する (このリストは ThreadingTCPServer オブジェクトに関連付けられる)。この情報はテストを実行すべきコミット ID をテストランナーに伝えるときに利用される。

        elif command == "register":
            # テストランナーをプールに追加する
            print "register"
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port":port}
            self.server.runners.append(runner)
            self.request.sendall("OK")

dispatch コマンドはレポジトリオブザーバーがテストランナーにコミット ID を伝えるときに利用する。このコマンドは dispatch:<コミット ID> というフォーマットのメッセージで表される。ディスパッチャはメッセージをパースしてコミット ID を取得し、それをテストランナーに送信する。

        elif command == "dispatch":
            # テストをディスパッチせよという指示が来た。
            print "going to dispatch"
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)

results コマンドはテストランナーが完了したテストの結果を報告するときに利用する。このコマンドは results:<コミット ID>:<result の長さ>:<results> というフォーマットのメッセージで表される。<コミット ID> は実行されたテストが検証したコミット ID を表す。<result> はテスト結果を表す出力である。

        elif command == "results":
            print "got test results"
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 = コマンドに含まれる ":" の個数
            remaining_buffer = self.BUF_SIZE - \
                (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK")

テストランナー (test_runner.py)

テストランナーの役割は与えられたコミット ID に対するテストを行い、結果を報告することである。テストランナーはディスパッチャサーバーとのみ通信し、テストするコミット ID の受け取りとテスト結果の報告をそこで行う。

test_runner.py が起動されると、テストランナーサーバーを実行する serve 関数が呼び出され、同時に dispatcher_checker 関数を実行するスレッドが起動される。この開始処理は repo_observer.pydispatcher.py で見たものとよく似ているので、ここでは省略する。

dispatcher_checker 関数は 5 秒ごとにディスパッチャサーバーに ping を送信し、ディスパッチャサーバーが応答可能な状態にあることを確認する。この処理はリソースを管理する上で重要である。ディスパッチャサーバーが落ちているときテストランナーはテストの開始も結果の報告もできないので、意味のある処理を一切行えない。

    def dispatcher_checker(server):
        while not server.dead:
            time.sleep(5)
            if (time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                                       server.dispatcher_server["host"],
                                       int(server.dispatcher_server["port"]),
                                       "status")
                    if response != "OK":
                        print "Dispatcher is no longer functional"
                        server.shutdown()
                        return
                except socket.error as e:
                    print "Can't communicate with dispatcher: %s" % e
                    server.shutdown()
                    return

テストランナーは ThreadingTCPServer であり、ディスパッチャサーバーと同様のパターンが利用される。ディスパッチャサーバーはテストランナーに対してコミット ID を伝達するだけではなく定期的な ping も行うので、個別のスレッドを使った処理が必要になる。

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None   # ディスパッチャサーバーのホスト/ポート
    last_communication = None  # ディスパッチャサーバーからの最後の通信
    busy = False # 状態フラグ
    dead = False # 状態フラグ

通信フローはディスパッチャサーバーからのコミット ID 受理のリクエストから始まる。このリクエストを受け取ったテストランナーがジョブを実行可能なら、肯定的な返答を送信して通信を閉じ、テストの実行を開始する。テストの実行中もディスパッチャサーバーからのリクエストの受理とテストの開始ができるように、テストは新しく起動されたスレッドで実行される。

このため、テストランナーはテストを (個別のスレッドで) 実行しているときでもディスパッチャサーバーからのリクエストに問題なく応答できる。テスト実行中でも ping に応答できるのに加えて、一つのテストランナーで複数のテストを同時に実行することもできる。これとは異なる設計としては、ディスパッチャサーバーが各テストランナーサーバーとの接続を保持する設計が考えられる。しかし、こうするとディスパッチサーバーのメモリ使用量が大きくなり、障害による意図しない接続の切断といったネットワークの問題に弱くなる。

テストランナーサーバーとディスパッチャサーバーの通信で使われるメッセージは二種類ある。一つ目の ping メッセージは、テストランナーが応答可能な状態にあることをディスパッチャサーバーが確認するために利用される。

class TestHandler(SocketServer.BaseRequestHandler):
    ...

    def handle(self):
        ....
        if command == "ping":
            print "pinged"
            self.server.last_communication = time.time()
            self.request.sendall("pong")

二つ目の runtest メッセージは特定のコミットに対するテストの開始を指示するメッセージであり、runtest:<コミット ID> というフォーマットをしている。runtest メッセージを受け取ったテストランナーはテストが既に実行中かどうかを確認し、もしそうなら BUSY メッセージをディスパッチャサーバーに返答する。そうでなく新しいテストを実行可能なら OK メッセージを返答し、自身の状態を busy に変更し、run_tests 関数を実行する。

        elif command == "runtest":
            print "got runtest command: am I busy? %s" % self.server.busy
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print "running"
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False

run_tests 関数はシェルスクリプト test_runner_script.sh を呼び出してレポジトリを指定したコミット ID の状態に更新する。このスクリプトが成功したならレポジトリはテストすべきコミット ID の状態にあるので、unittest モジュールを使ってテストを実行し、結果をファイルにまとめる。テストの実行が終わったら結果を収めたファイルを読み、それを results メッセージとしてディスパッチャに送信する。

    def run_tests(self, commit_id, repo_folder):
        # レポジトリを更新する
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print output
        # テストを実行する
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # ディスパッチャに結果を伝える
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
                            int(self.server.dispatcher_server["port"]),
                            "results:%s:%s:%s" % (commit_id, len(output), output))

test_runner_script.sh は次の通りである:

#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

test_runner.py を実行するには、テストの実行で利用するレポジトリのクローンがあるディレクトリをコマンドライン引数に与える必要がある。今考えている例ではテストランナー用に以前に作成したクローンがあるディレクトリ名 /path/to/test/test_repo_clone_runner を指定する。test_runner.py はデフォルトで 89009000 の範囲のポートを使ってローカルホスト上にサーバーを作成し、localhost:8888 にあるディスパッチャサーバーに接続を試みる。コマンドライン引数 --host--port を使えばテストランナーサーバーが利用するアドレスとポートを指定でき、--dispatcher-server を使えばディスパッチャサーバーのアドレスを指定できる。

制御フロー図

この CI システムの概観図を図 1 に示す。この図では新しいコミットが作成されるとき三つのファイル (repo_observer.py, dispatcher.py, test_runner.py) が全て実行されているものと仮定されている。

図 1. CI システムの制御フロー
図 1CI システムの制御フロー

コードの実行

この CI システムをローカルで実行するには、三つのファイルに対応する三つの異なるシェル端末が必要になる。まず、次のコマンドでディスパッチャをポート 8888 で開始する:

$ python dispatcher.py

次に、新しいシェルでテストランナーを開始する。開始されたテストランナーはすぐにディスパッチャと通信して、自身を登録する:

$ python test_runner.py <path/to/test_repo_clone_runner>

なお、テストランナーは 89009000 の範囲にある未使用のポートを任意に選んで利用するので、いくつ起動しても問題ない。

最後に、さらに新しいシェルでレポジトリオブザーバーを起動する:

$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>

これで全ての準備が整ったので、テストを実行させてみよう! このためには新しくコミットを作成する必要があるので、マスターレポジトリを適当に変更する:

$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m "new file" new_file

するとレポジトリオブザーバーが新しいコミットを検出し、新しいコミット ID をディスパッチャに通知する。起動したシェルを見れば簡単なログを確認できる。ディスパッチャはテストランナーにテスト実行の指示を出し、受け取ったテスト結果を test_results/ に保存する (コミット ID がファイル名となる)。

エラー処理

この CI システムには単純なエラー処理がいくつか含まれる。

test_runner.py プロセスを kill すると、dispatcher.py はテストランナーが消失したことを検出し、それをプールから取り除く。消失したテストランナーが担当していたジョブは他に利用可能なテストランナーがプールに存在すれば委譲され、そうでなければプールに空きが生まれるまで保留される。マシンのクラッシュやネットワークの障害をシミュレートするためにテストランナーを kill することもできる。

ディスパッチャを kill すると、レポジトリオブザーバーはディスパッチャの消失を検出して例外を送出する。テストランナーも同様にその事実を検出し、シャットダウンする。

結論

個別のプロセスに関心を分離することで、分散された継続的インテグレーションシステムの基礎を作ることができた。プロセス同士がソケットリクエストで通信する設計によって複数のマシンにわたるシステムの分散が可能になり、信頼性とスケーラビリティが向上した。

この CI システムは非常に単純なので、機能の拡張は難しくない。可能な改善の例をいくつか示す:

全てのコミットに対するテストの実行

現在のシステムは新しいコミットを定期的に確認し、そのとき見つけた最新のコミットに対してテストを実行する。この仕組みは全てのコミットをテストするように改善されるべきである。このためには、定期的な確認処理を改変して最後にテストしたコミットから最新のコミットまでのそれぞれに対してテストをディスパッチする必要がある。

賢いテストランナー

ディスパッチャが応答しないことを検出したテストランナーは実行を停止する。この処理はテストの実行中にも行われる! ディスパッチャがオンラインに復帰するまで一定の時間だけ (リソース管理を気にしないなら無制限に) 待機する方が望ましい。つまり、実行中のテストがあるときにディスパッチャの消失を検出したとき即座にシャットダウンするのではなく、まずテストの終了するのを待ち、さらにテスト結果の報告先であるディスパッチャがオンラインになるのを指定された時間だけ待つ。こうするとテストランナーのリソースが節約され、テストがコミットごとにちょうど一度だけ実行される。

現実的なテストレポート

実際の開発で使われる CI システムは、テスト結果を収集して報告するレポーターサービスを持つ場合が多い。テスト結果は人間が読める形で公開され、テストの失敗などの注目すべきイベントが起こると関係者に通知される。この仕組みを本章で紹介した CI システムで実装するには、報告された結果の収集をディスパッチャではなく新しいプロセスに行わせればよい。この新しいプロセスをウェブサーバーとして (あるいはウェブサーバーに接続して) 結果をオンラインで公開したり、メールサーバーを使ってテストの失敗を購読者に伝えたりといった処理が可能となる。

テストランナーマネージャ

現在のシステムでテストランナーを開始するには test_runner.py を手動で起動する必要がある。こうする代わりに、テストランナーマネージャを実行するプロセスを用意する方法が考えられる。このプロセスはディスパッチャからのテストリクエストの頻度に応じて実行中のテストランナーの個数を調整する。つまり runtest メッセージを受け取ったとき手の空いているテストランナーが存在しなければ新しく起動し、負荷が減少したときは手の空いているテストランナーを kill する。

こういった機能を実装すれば、この単純な CI システムのロバスト性と障害耐性を改善できる。さらに、ウェブベースのテストレポーターといった他のシステムへの統合も可能になる。

実際の CI システムが達成可能な柔軟性のレベルを確認したいなら、非常にロバストな Java 製のオープンソース CI システム Jenkins に目を通すことを勧める。Jenkins はプラグインを通じて拡張可能な基礎的 CI システムの例であり、ソースコードは GitHub から確認できる。もう一つ参考になるプロジェクトとして Travis CI がある。Travis CI は Ruby で書かれており、ソースコードは GitHub から確認できる。

CI システムの動作を理解し、自力で CI システムを実装する練習は以上で終了となる。高信頼な分散システムの構築で必要となる概念の理解を深められたと思うので、その知識を使えばより複雑な解決策を構築できるだろう。


  1. Bash を使うのは、ファイルの存在確認、ファイルの作成、Git の利用が必要で、それを行うには Bash スクリプトが最も直接的で簡単なためである。代わりにクロスプラットフォームの Python パッケージを使うこともできる。例えば Python に組み込みのモジュール os を使えばファイルシステムにアクセスでき、GitPython を使えば Git レポジトリにアクセスできる。ただ、実行はより遠回りになる。 ↩︎

広告