フローショップスケジューラ
フローショップスケジューラ
フローショップスケジューリング問題 (flow shop scheduling problem) はオペレーションリサーチの分野で最も挑戦的で最も研究されてきた問題の一つである。挑戦的な最適化問題でよくあるように、現実的なサイズのフローショップスケジューリング問題では最適解がまず求まらない。本章では局所探索法 (local search) と呼ばれるテクニックを利用するソルバの実装を紹介する。局所探索法を使うと、最適解を求められない状況でも「非常に優れた」解を見つけることができる。このソルバは指定された時間だけ新しい解を探索・評価し、最終的に得られた中で最も優れた解を返す。
局所探索法のアイデアは「手元にある解に似た別の解を試すことで、ヒューリスティック的に解を少しずつ改善する」というものである。本章で紹介するソルバは似た解を生成して評価する戦略、そして優れていそうな解を選ぶ戦略をそれぞれ複数持つ。実装は Python で書かれ、依存パッケージを持たない。知名度の低い Python の機能を活用することで、ソルバは探索の途中結果を利用して探索戦略を動的に変更する。
以降では、フローショップスケジューリング問題に関する背景知識を最初に解説する。続いて、一般的なソルバのコードと本章で使う近傍選択戦略と様々なヒューリスティックを詳しく見る。その後、様々な戦略を一つにまとめる動的な戦略選択の仕組みを考える。最後に、プロジェクトの概観と実装プロセスの中で得られた教訓をまとめる。
背景知識
フローショップスケジューリング問題
フローショップスケジューリング問題とは、いくつかのタスクから構成されるジョブがあるときに、ジョブ全体の完了時間が最小となるようにタスクの実行順序を決める最適化問題である。例えば、車を製造する組立ラインで車の各部分が違う機械で処理されるとする。要件は車種ごとに異なり、例えばフレームの色はそれぞれ異なる。この設定において、それぞれの車が新しいジョブであり、車の各部分がタスクとなる。また、全てのジョブは同じタスクを同じ順序で実行することで完了すると仮定される。
フローショップスケジューリング問題の目標は全てのジョブが持つ全てのタスクを完了させるまでの時間を最小化することである。この時間はメイクスパン (makespan) とも呼ばれる。この問題は様々な応用を持ち、特に製造施設の最適化と密接に関連する。
フローショップスケジューリング問題では \(n\) 台の機械と \(m\) 個のジョブが設定される。車の例で言えば、車を組み立てる機械の台数が \(n\) となり、製造すべき車の台数が \(m\) となる。どのジョブも同じ \(n\) 個のタスクから構成され、\(i\) 番目のタスクは \(i\) 台目の機械だけが実行でき、ジョブ \(j\) の \(i\) 番目のタスクにかかる時間は \(p(j, i)\) で不変だと仮定される。さらに、任意のジョブに属するタスクは機械と同じ順序で実行されなければならない。つまり、タスク \(i\) を完了させてからタスク \(i + 1\) を開始する必要がある。車の例で言えば、フレームが取り付けられていない状態でフレームの塗装を始めてはいけない。最後の制約として、一台の機械が同時に実行できるタスクは最大で一つである。
タスクの順序が事前に決まっているので、フローショップスケジューリング問題に対する解はジョブの置換で表現できる。どの機械を考えたとしても、その機械が処理する (タスクの属する) ジョブの順序は同じである。よってジョブの置換が与えられたとき、機械 \(i\) がジョブ \(j\) のタスク \(i\) に取り掛かる時刻は次の二つの時刻の遅い方と定まる:
- 機械 \(i\) が、ジョブ \(j - 1\) のタスク \(i\) (同じ機械の一つ前のタスク) を完了した時刻
- 機械 \(i-1\) が、ジョブ \(j\) のタスク \(i-1\) (同じジョブの一つ前のタスク) を完了した時刻
二つの時刻の最大値を取っているので、マシン \(i\) またはジョブ \(j\) には停止時間が生まれる。この待機時間が最終的に最小化したい値である。この時間が小さければそれだけ全体のメイクスパンも小さくなる。
問題の形が単純なので、ジョブの任意の置換が正当な解となり、最適解も何らかの置換だと分かる。よって、解を改善するにはジョブの置換を適当に変更して、その新しい置換のメイクスパンを計算すればよい。以降の議論では、ジョブの置換を候補 (candidate) と呼ぶ。
二つのジョブと二つの機械からなる簡単な例を考えよう。一つ目のジョブは二つのタスク \(A\), \(B\) を持ち、それぞれ完了までの所要時間は \(1\) 分と \(2\) 分だとする。二つ目のジョブは二つのタスク \(C\), \(D\) を持ち、完了までの所要時間はそれぞれ \(2\) 分と \(1\) 分だとする。問題の設定より、\(A\) は \(B\) よりも前に、\(C\) は \(D\) よりも前に完了しなければならない。ジョブの個数が \(2\) なので、ジョブの置換の個数は \(2! = 2\) である。二つ目のジョブを最初に配置するとメイクスパンは \(5\) となるのに対して、一つ目のジョブを最初に配置するとメイクスパンは \(4\) で済む:


いずれの場合でも、全てのタスクは可能な限り早く開始されることに注目してほしい。ここから、優れた置換とは機械の手が空く (処理すべきタスクが存在しない) 時間が少ない置換と言える。
局所探索法
局所探索法は最適化問題の最適解を求められないときに優れた解を求める戦略の一つである。直感的に言えば、局所探索法はそれなりに優れた適当な解から始めて、解を少しずつ改善する操作を繰り返すことで動作する。そのとき可能な全ての解の中から候補を選ぶのではなく、何らかの定義から定まる近傍 (neighbourhood) の中から候補を選択する: 特定の解の近傍とは、その解に「似ている」解の集合を言う。フローショップスケジューリング問題ではジョブの任意の置換が正当な解なので、ジョブの列をシャッフルする任意の仕組みを局所探索法の一部として利用できる (以降で見るように、これに近いことが実際に行われる)。
局所探索法を実行するには、いくつかの質問に答える必要がある:
- 最初に使うべき解はどれか?
- 解が与えられたとき、その解の近傍は具体的にどんな集合か?
- 近傍が与えられたとき、次に移るべき解をどのように選択するか?
以降の三つの節でこれらの問題を一つずつ議論する。
一般的なソルバ
本節ではフローショップスケジューラの一般的なフレームワークを紹介する。まず、ソルバの設定と必要なインポートを行う Python コードを示す:
import sys, os, time, random
from functools import partial
from collections import namedtuple
from itertools import product
import neighbourhood as neigh
import heuristics as heur
##########
## 設定 ##
##########
TIME_LIMIT = 300.0 # ソルバを実行する時間 (単位は秒)
TIME_INCREMENT = 13.0 # ヒューリスティックの変更頻度 (単位は秒)
DEBUG_SWITCH = False # True なら、実行中にヒューリスティックの情報を表示
MAX_LNS_NEIGHBOURHOODS = 1000 # LNS で探索する近傍の最大個数
詳しい説明が必要だと思われる設定が二つある。TIME_INCREMENT
は動的な戦略選択で利用され、MAX_LNS_NEIGHBOURHOODS
は近傍の選択戦略を調整する。両方とも以降でさらに詳しく説明される。
これらの設定はコマンドライン引数としてユーザーに公開することもできる。ただ今の段階では、コマンドライン引数で指定できるのは入力データのファイル名だけとする。入力データ ── Taillard ベンチマークセットから取ったもの ── はソルバが解くべき問題を表し、フローショップスケジューリング問題を表現するための標準的なフォーマットで書かれると仮定される。次のコードはコマンドラインから実行したときのエントリーポイントであり、コマンドライン引数の個数に応じて適切な関数を呼び出す:
if __name__ == '__main__':
if len(sys.argv) == 2:
data = parse_problem(sys.argv[1], 0)
elif len(sys.argv) == 3:
data = parse_problem(sys.argv[1], int(sys.argv[2]))
else:
print "\nUsage: python flow.py <Taillard problem file> [<instance number>]\n"
sys.exit(0)
(perm, ms) = solve(data)
print_solution(data, perm)
Taillard ベンチマークセットが使う問題ファイルのフォーマットをパースする方法は後述する。問題ファイルはオンラインで入手できる。
solve
メソッドに渡される data
はリストであり、各要素がジョブを表す。data
の要素は整数のリストであり、それぞれの整数は対応するジョブが特定のタスクにかける時間を表す。solve
メソッドは戦略の集合を表すグローバル変数 (後述) をまず初期化する。ここで重要なのは各戦略に関する統計情報を管理する strat_*
変数である。これらの変数が記録する情報は解の探索中に行われる動的な戦略の選択で利用される。
def solve(data):
""" フローショップスケジューリング問題のインスタンスを解く。 """
# 循環インポートの問題を避けるため、戦略の初期化をここで行う。
initialize_strategies()
global STRATEGIES
# 各戦略に対して、次の情報を記録する。
# improvements: この戦略による改善
# time_spent: この戦略が消費した時間
# weights: この戦略の優秀さに応じた重み
# usage: この戦略を利用した回数
strat_improvements = {strategy: 0 for strategy in STRATEGIES}
strat_time_spent = {strategy: 0 for strategy in STRATEGIES}
strat_weights = {strategy: 1 for strategy in STRATEGIES}
strat_usage = {strategy: 0 for strategy in STRATEGIES}
フローショップスケジューリング問題の優れた特徴の一つに、ジョブの全ての置換が正当な解となる事実がある。その多くは非常に長いタイムスパンを持つかもしれないが、その中の一つが必ず最適解となる。このため、考えている解が実現可能な解の空間に収まっているかどうかを確認する必要はない ── いつでも実現可能となる!
ただ、置換からなる空間で局所探索法を実行するには最初の置換を決める必要がある。ここでは単純に、ジョブのランダムな置換を初期値として局所探索法を開始する:
# ジョブのランダムな置換から開始する。
perm = range(len(data))
random.shuffle(perm)
続いて、これまでに見つかった最良の置換を記録する変数、そして出力の制御で使われる時刻情報を記録する変数を初期化する:
# 最良の解を記録する。
best_make = makespan(data, perm)
best_perm = perm
res = best_make
# 反復の統計と時刻に関する情報を記録する。
iteration = 0
time_limit = time.time() + TIME_LIMIT
time_last_switch = time.time()
time_delta = TIME_LIMIT / 10
checkpoint = time.time() + time_delta
percent_complete = 10
print "\nSolving..."
このプログラムは局所探索法を使ったソルバなので、後は指定された時間が経過するまで解の改善を試み続けるだけとなる。反復回数を記録し、定期的にソルバの進捗を出力する:
while time.time() < time_limit:
if time.time() > checkpoint:
print " %d %%" % percent_complete
percent_complete += 10
checkpoint += time_delta
iteration += 1
戦略の選び方は後で説明する。ここでは各戦略が neighbourhood
関数と heuristic
関数を提供することを理解すれば十分である。neighbourhood
関数は次の候補からなる集合を返し、heuristic
関数は候補の集合から最良の候補を選択する。この二つの関数を使って、新しい置換 perm
と新しいメイクスパン res
が計算される:
# ヒューリスティックを使って最良の戦略を選択する
strategy = pick_strategy(STRATEGIES, strat_weights)
old_val = res
old_time = time.time()
# 候補の集合 (選択された戦略の近傍) から、選択された戦略の
# ヒューリスティックを使って次の置換を選択する
candidates = strategy.neighbourhood(data, perm)
perm = strategy.heuristic(data, candidates)
res = makespan(data, perm)
メイクスパンの計算に難しいところはない: 最後のジョブの完了時刻を計算すればよい。次のコードで使われている compile_solution
関数の動作は以降で説明する。ここでは返り値が二次元配列であり、置換が表すスケジュールにおける最後のジョブの最後のタスクの開始時刻が [-1][-1]
要素だと理解しておいてほしい:
def makespan(data, perm):
""" 与えられた解のメイクスパンを計算する。 """
return compile_solution(data, perm)[-1][-1] + data[perm[-1]][-1]
続いて、戦略の選択で利用する次の統計情報を記録する:
- どれだけ解を改善したか
- どれだけの時間を計算に費やしたか
- これまでに何回使用されたか
また、現在の解よりも優れた解が見つかったときは最良の解を記録する変数を更新する:
# 利用した戦略に関する統計情報を更新する。
strat_improvements[strategy] += res - old_val
strat_time_spent[strategy] += time.time() - old_time
strat_usage[strategy] += 1
if res < best_make:
best_make = res
best_perm = perm[:]
戦略の選択で利用される重みは定期的に更新される。可読性のため関連するスニペットは省略し、詳細は以降で解説する。最後のステップとして、while
ループが完了する (制限時間に達する) と解の探索プロセスに関する統計情報を出力し、見つかった最良の置換とそのメイクスパンを返す:
print " %d %%\n" % percent_complete
print "\nWent through %d iterations." % iteration
print "\n(usage) Strategy:"
results = sorted([(strat_weights[STRATEGIES[i]], i)
for i in range(len(STRATEGIES))], reverse=True)
for (w, i) in results:
print "(%d) \t%s" % (strat_usage[STRATEGIES[i]], STRATEGIES[i].name)
return (best_perm, best_make)
問題ファイルのパース
問題ファイルをパースする関数 parse_problem
の入力は問題が格納されたファイルの名前と利用すべきインスタンスの番号である (ファイルには複数のインスタンスが含まれる)。
def parse_problem(filename, k=1):
"""
Taillard 問題ファイルの k 番目のインスタンスをパースして返す。
Taillard 問題ファイルはフローショップスケジューリング問題の
標準的なベンチマークセットである。
"""
print "\nParsing..."
まずファイルを読み込み、インスタンスを分離する行を設定する:
with open(filename, 'r') as f:
# インスタンスを分離する文字列
problem_line = ('/number of jobs, number of machines, initial seed, '
'upper bound and lower bound :/')
# ファイルを行ごとに分割し、さらに各行の両端にある空白文字を削除する。
lines = map(str.strip, f.readlines())
続いて、探しているインスタンスの特定を簡単にするため、全ての行を '/'
で連結する。こうするとインスタンスの先頭にある文字列 problem_line
による分割でファイルをインスタンスごとに分割できる。この分割を上手く動作させるには、最初の行の先頭に '/'
を追加する必要がある。また、指定された k
がファイルに含まれるインスタンスの個数を超えている場合も検出する:
# 次の処理のために最初の行を調整する。
lines[0] = '/' + lines[0]
# ファイルに含まれないと分かっている文字 '/' を分離文字として使いながら、
# k 番目のインスタンスが含まれる部分を切り出す。
try:
lines = '/'.join(lines).split(problem_line)[k].split('/')[2:]
except IndexError:
max_instances = len('/'.join(lines).split(problem_line)) - 1
print "\nError: Instance must be within 1 and %d\n" % max_instances
sys.exit(0)
次に、各タスクの所要時間を表す文字列を整数に変換し、結果をリストに格納する。最後に、データを zip
して列と行を入れ替え、solve
関数が期待する (data
の各要素がジョブに対応する) フォーマットに変換したものを返す:
# スペースで行を分割し、各要素を int に変換する。
data = [map(int, line.split()) for line in lines]
# zip を適用して行と列を入れ替える。
# 返り値の各要素は特定のジョブの各タスクの所要時間が入ったリストとなる。
return zip(*data)
解の組み立て
フローショップスケジューリング問題に対する解は全てのジョブに属する全てのタスクの正確な開始時刻である。この解をコード上ではジョブの置換として暗黙に表現しているので、置換を各タスクの開始時刻に変換する compile_solution
関数が用意される。この関数は問題を表すデータ (各タスクの所要時間) とジョブの置換を入力に受け取る。
compile_solution
関数は最初に各タスクの開始時刻を格納するデータ構造を初期化し、置換の最初のジョブに含まれるタスクの開始時刻を計算する:
def compile_solution(data, perm):
""" ジョブの置換を受け取り、機械のスケジュールを組み立てる。 """
num_machines = len(data[0])
# 注意: [[]] * m とはできない。
# そうすると同じリストを m 個持つリストが作成されてしまう。
machine_times = [[] for _ in range(num_machines)]
# 最初のジョブに含まれるタスクを各機械に割り当てる。
machine_times[0].append(0)
for mach in range(1,num_machines):
# このタスクを開始できるのは、一つ前のタスクが終わったとき
machine_times[mach].append(machine_times[mach-1][0] +
data[perm[0]][mach-1])
続いて残りのジョブのタスクを処理する。各ジョブの最初のタスクは、一つ前のジョブの最初のタスクが終わったときに開始できる。他のタスクは可能な限り早く開始される: 同じジョブの一つ前のタスクの完了時刻と、一つ前のジョブの同じタスクの完了時刻のうち遅い方と同じ時刻に開始される。
# 残りのジョブの開始時刻を割り当てる。
for i in range(1, len(perm)):
# 最初の機械では停止時間が生じない。
job = perm[i]
machine_times[0].append(machine_times[0][-1] + data[perm[i-1]][0])
# それ以外の機械では、次の二つのうち遅い方の時刻にタスクが開始される:
# - 同じジョブの一つ前のタスクが完了した時刻
# - 一つ前のジョブの同じタスクが完了した時刻
for mach in range(1, num_machines):
machine_times[mach].append(max(
machine_times[mach-1][i] + data[perm[i]][mach-1],
machine_times[mach][i-1] + data[perm[i-1]][mach]))
return machine_times
解の出力
ソルバの実行が完了すると、プログラムは得られた解の情報をコンパクトな形式で出力する。全ジョブの全タスクの開始時刻をユーザーに見せるのではなく、次の情報を出力する:
- 最良のメイクスパンを達成するジョブの置換
- その置換から計算されるメイクスパン
- 各機械の開始時刻、完了時刻、停止時間
- 各ジョブの開始時刻、完了時刻、停止時間
ジョブあるいはマシンの開始時刻とは、ジョブの最初のタスクあるいはマシンが初めて処理するタスクの開始時刻を意味する。同様に、ジョブあるいはマシンの完了時刻とは、ジョブの最後のタスクあるいはマシンが最後に処理するタスクの完了時刻を意味する。ジョブあるいはマシンの停止時間とは、ジョブのタスクが処理を受けていない時間あるいは機械がタスクを処理していない時間の合計を意味する。停止時間が短くなれば全体の処理時間も短くなるので、停止時間は短い方が望ましい。
解を組み立てる (各タスクの開始時刻を計算する) コードは既に議論したので、置換とメイクスパンの出力は非常に簡単である:
def print_solution(data, perm):
""" 計算された解に関する情報を出力する。 """
sol = compile_solution(data, perm)
print "\nPermutation: %s\n" % str([i+1 for i in perm])
print "Makespan: %d\n" % makespan(data, perm)
マシンとジョブの開始時刻、完了時刻、停止時間の出力では Python のフォーマット文字列を利用する。ジョブの停止時間はジョブの開始時刻と完了時刻の差からジョブに属するタスクの所要時間の和を引いた値に等しい。機械の停止時間も同様の方法で計算できる。
row_format ="{:>15}" * 4
print row_format.format('Machine', 'Start Time', 'Finish Time', 'Idle Time')
for mach in range(len(data[0])):
finish_time = sol[mach][-1] + data[perm[-1]][mach]
idle_time = (finish_time - sol[mach][0]) - sum([job[mach] for job in data])
print row_format.format(mach+1, sol[mach][0], finish_time, idle_time)
results = []
for i in range(len(data)):
finish_time = sol[-1][i] + data[perm[i]][-1]
idle_time = (finish_time - sol[0][i]) - sum([time for time in data[perm[i]]])
results.append((perm[i]+1, sol[0][i], finish_time, idle_time))
print "\n"
print row_format.format('Job', 'Start Time', 'Finish Time', 'Idle Time')
for r in sorted(results):
print row_format.format(*r)
print "\n\nNote: Idle time does not include initial or final wait time.\n"
近傍
局所探索法のアイデアは、手元にある解の近くにある解に少しずつ移動するというものである。ある解の近傍 (neighbourhood) とは、その解に「近い」とみなされる解の集合を言う。本節では考えられる近傍の定義を簡単なものから順に見ていく。
一つ目の近傍の定義は指定された個数のランダムな置換を近傍と定める。この定義は手元にある解を全く考慮しないので、「近傍」とは呼べないかもしれない。しかし、検索空間の広い範囲を探索するために検索にランダム性を加えることは優れたプラクティスとされている。
def neighbours_random(data, perm, num = 1):
# num 個のランダムなジョブの置換を返す。
# 返り値のリストは perm を含む。
candidates = [perm]
for i in range(num):
candidate = perm[:]
random.shuffle(candidate)
candidates.append(candidate)
return candidates
次の定義は、置換に含まれる任意の二つのジョブを入れ替えて得られる置換が近傍に属する定める。以下のコードでは itertools
パッケージの combinations
関数を用いることで、添え字の組を走査して対応する二つの要素を入れ替えた新しい置換を得る処理を簡潔に表現している。言い換えれば、この定義は手元にある置換とよく似た置換を「近い」とみなす。
def neighbours_swap(data, perm):
# 任意の要素の組を入れ替えた置換全体からなるリストを返す。
candidates = [perm]
for (i,j) in combinations(range(len(perm)), 2):
candidate = perm[:]
candidate[i], candidate[j] = candidate[j], candidate[i]
candidates.append(candidate)
return candidates
次の近傍の定義は考えている問題に固有の情報を考慮する。具体的には、待機時間が長いジョブを見つけ、そのジョブを任意に入れ替えた置換が近傍に属すると定める。引数の size
は考えるジョブの個数を表す: 待機時間が長い順に size
個のジョブを取り、それらを入れ替えた置換が計算される。
def neighbours_idle(data, perm, size=4):
# 待機時間が長い size 個のジョブを入れ替えた置換を返す。
candidates = [perm]
# 各ジョブの待機時間を計算する。
sol = flow.compile_solution(data, perm)
results = []
for i in range(len(data)):
finish_time = sol[-1][i] + data[perm[i]][-1]
idle_time = (finish_time - sol[0][i]) - sum([t for t in data[perm[i]]])
results.append((idle_time, i))
続いて待機時間が上位の size
個のジョブを計算する。
# 待機時間が長い size 個のジョブを取る。
subset = [job_ind for (idle, job_ind) in reversed(sorted(results))][:size]
最後に、これまでの処理で特定された待機時間が長いジョブを任意に入れ替えた置換を生成することで近傍を構築する。置換の計算では itertools
パッケージの permutations
関数が利用される:
# 待機時間が長いジョブを入れ替えた置換を列挙する。
for ordering in permutations(subset):
candidate = perm[:]
for i in range(len(ordering)):
candidate[subset[i]] = perm[ordering[i]]
candidates.append(candidate)
return candidates
最後の近傍の定義は、巨大近傍探索 (Large Neighbourhood Search, LNS) と呼ばれる手法で使われる定義である。簡単に言えば、LNS は特定のジョブの部分集合だけに注目することで動作する ── その部分集合の要素の最良の置換を求め、その置換を元の (ジョブ全体の) 置換に適用して得られる置換が LNS 近傍に属する。この処理をいくつかの (あるいは全ての) 特定のサイズの部分集合に対して繰り返せば、近傍に属する置換を増やせる。近傍のサイズは非常に速く大きくなるので、パラメータ MAX_LNS_NEIGHBOURHOODS
が上限として設定される。LNS の最初のステップは itertools
パッケージの combinations
関数を使ったランダムなジョブの部分集合の計算である:
def neighbours_LNS(data, perm, size = 2):
# 巨大近傍探索における近傍を返す。
candidates = [perm]
# 近傍の個数が多くなりすぎないように部分集合のサイズを制限する。
neighbourhoods = list(combinations(range(len(perm)), size))
random.shuffle(neighbourhoods)
続いて、計算された部分集合のそれぞれに対して、その部分集合の要素の置換の中で最良のものを求める。待機時間の長いジョブからなる部分集合に対して同様の処理を行うコードは以前に見た。重要な違いとして、ここでは部分集合ごとに最良の置換しか記録しない。こうすることで巨大な範囲の解を含む近傍が構築される。
for subset in neighbourhoods[:flow.MAX_LNS_NEIGHBOURHOODS]:
# 部分集合ごとに、最良の候補を記録する。
best_make = flow.makespan(data, perm)
best_perm = perm
# 注目している部分集合の全ての置換を考える。
for ordering in permutations(subset):
candidate = perm[:]
for i in range(len(ordering)):
candidate[subset[i]] = perm[ordering[i]]
res = flow.makespan(data, candidate)
if res < best_make:
best_make = res
best_perm = candidate
# 最良の置換だけを LNS としての最終的な候補に加える。
candidates.append(best_perm)
return candidates
size
パラメータをジョブの個数と同じ設定すると、ジョブの置換を全て走査した後に最良のものが選択される。しかし現実的な設定では size
の値は 3
か 4
に設定される。それより大きくすると neighbours_LNS
の実行時間が長くなりすぎてしまう。
ヒューリスティック
本章の文脈におけるヒューリスティックとは、候補の集合を受け取って一つの候補を選択する処理を言う。ヒューリスティックには問題データに対するアクセスも与えられ、それを使ってどの候補を選択すべきかを評価する。
一つ目のヒューリスティック heur_random
は候補を評価することなくランダムに選択する:
def heur_random(data, candidates):
# 候補をランダムに選んで返す。
return random.choice(candidates)
二つ目のヒューリスティック heur_hillclimbing
は heur_random
の真逆で、全ての候補のメイクスパンを計算した上で最良の候補を選択する。次のコードでリスト scores
の要素は (make, perm)
の形をしたタプルであり、make
は置換 perm
のメイクスパンであることに注意してほしい。このタプルを要素とするリストをソートすると、最も短いメイクスパンが含まれる組が先頭に来る。返り値の置換はソート後のリストで先頭にあるタプルの第二要素である。
def heur_hillclimbing(data, candidates):
# 候補の中で最良の置換を返す。
scores = [(flow.makespan(data, perm), perm) for perm in candidates]
return sorted(scores)[0][1]
最後のヒューリスティック heur_random_hillclimbing
はこれまでの二つを組み合わせる。局所探索法では候補をランダムに選べばいいわけではないし、かといって最良の候補を選べばいいわけでもない。heur_random_hillclimbing
ヒューリスティックは \(0.5\) の確率で最良の候補を選択し、\(0.25\) の確率で二番目に良い候補を選択し、以下同様とすることで「それなりに優れた」候補をランダムに選択する。コード中の while
ループはコインを投げて表が出続ける間インデックスを増加させるのに等しい。最終的にコインの裏が出てループを脱出した時点のインデックスを使って最終的な候補が選択される。
def heur_random_hillclimbing(data, candidates):
# メイクスパンでソートしたときの順位に反比例する確率で候補を選択して返す。
scores = [(flow.makespan(data, perm), perm) for perm in candidates]
i = 0
while (random.random() < 0.5) and (i < len(scores) - 1):
i += 1
return sorted(scores)[i][1]
メイクスパンは最適化対象の値なので、優れたメイクスパンを持つ候補を選択すれば局所探索のプロセスが好転する可能性が高い。そこにランダム性を加えれば、どんなときも近傍の中で最良な候補に向かう盲目的な戦略を離れ、近傍を広く探索できるようになる。
動的な戦略選択
局所探索法を使って優れた置換を探索するアルゴリズムの中心部にあるのは、何らかの近傍生成関数とヒューリスティックを使って現在の解から別の解にジャンプする処理である。しかし、近傍生成関数とヒューリスティックの選択肢はいくつもある。どれを使えばいいだろうか? 現実的な問題では、探索中に戦略を切り替えると上手く行くことが多い。私たちの実装が用いる動的な戦略選択は探索の途中結果を参照し、使用する近傍生成関数とヒューリスティックを調子の良いものに切り替える。戦略 (strategy) とは近傍生成関数とヒューリスティック (およびそれらのパラメータ) の設定を言う。
まず、探索で利用可能な戦略を表すデータを構築するコードがある。この戦略の初期化コードでは、functools
パッケージの partial
関数を使って近傍生成関数にパラメータが部分適用される。ヒューリスティック関数のリストも作成され、最後に product
関数を使って作成される近傍生成関数とヒューリスティック関数の任意の組が新しい戦略として記録される。
##########
## 戦略 ##
########################################################
## 戦略 (strategy) は近傍 (次の候補の集合) 生成関数と
## ヒューリスティック (候補の選択) 関数の組である。
##
STRATEGIES = []
# 名前付きタプルを使うと、辞書を使うよりも少しだけコードが読みやすくなる。
# 例えば strategy['name'] は strategy.name となる。
Strategy = namedtuple('Strategy', ['name', 'neighbourhood', 'heuristic'])
def initialize_strategies():
global STRATEGIES
# これから利用する近傍生成関数を (パラメータと共に) 定義する。
neighbourhoods = [
('Random Permutation', partial(neigh.neighbours_random, num=100)),
('Swapped Pairs', neigh.neighbours_swap),
('Large Neighbourhood Search (2)', partial(neigh.neighbours_LNS, size=2)),
('Large Neighbourhood Search (3)', partial(neigh.neighbours_LNS, size=3)),
('Idle Neighbourhood (3)', partial(neigh.neighbours_idle, size=3)),
('Idle Neighbourhood (4)', partial(neigh.neighbours_idle, size=4)),
('Idle Neighbourhood (5)', partial(neigh.neighbours_idle, size=5))
]
# これから利用するヒューリスティック関数を定義する。
heuristics = [
('Hill Climbing', heur.heur_hillclimbing),
('Random Selection', heur.heur_random),
('Biased Random Selection', heur.heur_random_hillclimbing)
]
# 両者の任意の組が戦略となる。
for (n, h) in product(neighbourhoods, heuristics):
STRATEGIES.append(Strategy("%s / %s" % (n[0], h[0]), n[1], h[1]))
これで戦略が定義できた。では、どれを使うべきだろうか? 探索で一つの戦略だけを使うのは望ましくないものの、戦略をランダムに使えばいいわけでもなさそうに思える。そこで、各戦略に重み (weight) を割り当て、その重みを使って戦略を選択する仕組みを導入する (重みの計算については後述)。次の pick_strategy
関数は戦略のリストと対応する重みのリストを受け取る。重みは相対的なので、どんな値を使っても構わない。
重みを考慮しつつランダムに戦略を選ぶアルゴリズムは次の通りである。まず、\(0\) から重みの和までの一様乱数を生成して \(x\) とする。次に、\(i\) 番目までの戦略の和が \(x\) より大きくなる自然数 \(i\) の最小値を求める。この最小値がランダムに選択された戦略の添え字となる。「ルーレット選択 (roulette wheel selection)」などと呼ばれるこのアルゴリズムは、大きな重みを持つ要素を高い確率で選択する。
def pick_strategy(strategies, weights):
# ルーレット選択: 重みを考慮しつつランダムに戦略を選択する。
# 戦略をランダムに選択するのではなく、これまでに優れた結果を示している戦略を
# (大きな重みを通して) 多く選択する。
total = sum([weights[strategy] for strategy in strategies])
pick = random.uniform(0, total)
count = weights[strategies[0]]
i = 0
while pick > count:
count += weights[strategies[i+1]]
i += 1
return strategies[i]
最後に残ったのは解の探索中に重みを計算する方法である。重みの計算はソルバのメインループで一定の時間 TIME_INCREMENT
が経過するたびに行われる:
# 戦略の重みを定期的に再計算する。
# こうすることで、優れた結果を示す戦略が探索で使われやすくなる。
if time.time() > time_last_switch + TIME_INCREMENT:
time_last_switch = time.time()
strat_improvements
には現在のインターバルでそれぞれの戦略が達成した改善の和が記録され、strat_time_spent
には現在のインターバルで各戦略が費やした計算時間の和が記録されることを思い出してほしい。改善の和を計算時間の和で割って正規化した値が戦略の評価基準として利用される。前回の重みの計算から戦略が一度も使われない可能性もあるので、小さいデフォルト値が設定される。
# 改善量を計算時間で割って正規化する。
results = sorted([
(float(strat_improvements[s]) / max(0.001, strat_time_spent[s]), s)
for s in STRATEGIES])
現在のインターバルにおける戦略の成績ランキングがこれで作成できた。戦略が \(k\) 個あるとき、最も優れた戦略の重みには \(k\) が加算され、二番目に優れた戦略の重みには \(k-1\) が加算され、以下同様となる。全ての戦略の重みは大きくなり、最も成績の悪い戦略の重みは \(1\) だけ大きくなる。
# 成績の良い戦略の重みを相対的に増加させる。
for i in range(len(STRATEGIES)):
strat_weights[results[i][1]] += len(STRATEGIES) - i
追加の補正として、使用されなかった戦略の重みを無条件に大きくする。これは戦略が完全に忘れらることを防ぐためである。初期に成績の悪い戦略も後に優れた成績を収めるかもしれない。
# 未使用の戦略の重みを増加させ、飢餓を防ぐ。
if results[i][0] == 0:
strat_weights[results[i][1]] += len(STRATEGIES)
最後に、戦略のランキングに関する情報を (DEBUG_SWITCH
フラグが設定されているなら) 出力し、strat_improvements
と strat_time_spent
をリセットして次のインターバルに備える。
if DEBUG_SWITCH:
print "\nComputing another switch..."
print "Best: %s (%d)" % (results[0][1].name, results[0][0])
print "Worst: %s (%d)" % (results[-1][1].name, results[-1][0])
print results
print sorted([strat_weights[STRATEGIES[i]]
for i in range(len(STRATEGIES))])
strat_improvements = {strategy: 0 for strategy in STRATEGIES}
strat_time_spent = {strategy: 0 for strategy in STRATEGIES}
議論
本章では比較的少量のコードでフローショップスケジューリング問題という複雑な最適化問題を解くソルバを見た。こういった大規模な最適化問題で最適解を見つけることは難しいので、局所探索法などの近似テクニックを使って非常に優れた解を見つけることになる。局所探索法は解空間を少しずつ移動することで優れた解を見つけようとする。
局所探索法の背後にある一般的な考え方は幅広い問題に適用できる。本章で注目したのは、問題に対する一つの解から似た解の近傍を生成する処理、そして解の候補を評価して比較する方法だった。この二つがあれば、最適解の計算が不可能な場合でも局所探索法のパラダイムを使って十分な価値のある解を計算できる。
問題を解くのに一つの戦略を使うのではなく、複数の戦略を用意して探索しながら途中結果に応じて利用する戦略を切り替える手法も見た。この単純で強力な手法を使うと考えている問題に対する完全でない戦略を組み合わせることができる上に、開発者は戦略を組み合わせるコードを書く必要がない。