Eventlet でネイティブスレッドをブロックするコードを扱う方法
Eventlet のように、グリーンスレッドで並行処理を行っている場合、そのグリーンスレッド群が動作しているネイティブスレッドがブロックすると処理全体が止まってしまう。 通常は、モンキーパッチによってネイティブスレッドがブロックするコードは全て書き換えられ、ブロックせずに別のグリーンスレッドにコンテキストスイッチするような動作になる。 しかし、これにも一部例外がある。Python は高速化や元々 C で書かれているライブラリの利用などを意図して C 拡張モジュールを使えるようになっている。 C で書かれたモジュールにはモンキーパッチが当てられない。もし C 拡張モジュールの中がブロックするようなコードになっていた場合、これは重要な問題となる。 身近な例では RDB に接続するドライバはよく C 拡張モジュールを使って書かれており、処理の性質上ブロックすることも避けがたい。 では、この問題には解決方法がないのかというと、そんなことはないよというのが今回の内容。
とりあえず Eventlet をインストールしておく。
まず、次のようなコードを書いてみる。 このコードでは、Eventlet がモンキーパッチを当てる前のオリジナルな time モジュールを使って sleep をかけることで、ネイティブスレッドを意図的にブロックしている。 C 拡張モジュールを書くのが面倒なので、以降はネイティブスレッドをブロックするにはこのやり方を使う。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import eventlet eventlet.monkey_patch() # モンキーパッチを当てる前のオリジナルのモジュールを取り出す from eventlet import patcher orig_time = patcher.original('time') def greenthread1(): while True: print('Green thread: 1') time.sleep(1) def greenthread2(): while True: print('Green thread: 2') time.sleep(1) if __name__ == '__main__': eventlet.spawn(greenthread1) eventlet.spawn(greenthread2) while True: print('Native thread: 1') # グリーンスレッドが動いているネイティブスレッドをブロックさせる orig_time.sleep(1)
Native thread: 1 Native thread: 1 Native thread: 1
ネイティブスレッドがブロックしている上に、グリーンスレッドは切り替えのタイミングも与えられていないため、全く並行処理が行われなくなってしまっている。
一応、上記のコードでも、グリーンスレッドを切り替えるタイミングさえ与えてやれば、並行処理が全くできないわけではない。 例えば、上記を少しだけ変更して、以下のようにしてみる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import eventlet eventlet.monkey_patch() # モンキーパッチを当てる前のオリジナルのモジュールを取り出す from eventlet import patcher orig_time = patcher.original('time') def greenthread1(): while True: print('Green thread: 1') time.sleep(1) def greenthread2(): while True: print('Green thread: 2') time.sleep(1) if __name__ == '__main__': eventlet.spawn(greenthread1) eventlet.spawn(greenthread2) while True: print('Native thread: 1') # グリーンスレッドが動いているネイティブスレッドをブロックさせる orig_time.sleep(3) # 明示的にグリーンスレッドを切り替える time.sleep(0)
Native thread: 1 Green thread: 1 Green thread: 2 Native thread: 1 Green thread: 1 Green thread: 2 Native thread: 1
今度はグリーンスレッドの並行処理も一応は進んでいる。 しかし、本来意図している 1 秒毎の処理ではなく 3 秒毎に表示されているようだ。 これは、ネイティブスレッド自体がブロックしている 3 秒の間はグリーンスレッドは処理が行えていないため。
つまり、グリーンスレッドは自身が動作しているネイティブスレッドがブロックしてしまうと、いずれにせよ都合が悪い。 なので、グリーンスレッドとネイティブスレッドを併用する場合には、グリーンスレッドが動作しているのとは別のネイティブスレッドを走らせる。 次のコードでは、ネイティブスレッドをブロックする処理の実行を、グリーンスレッドが動作しているのとは別のネイティブスレッド上で実行させてみる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import eventlet eventlet.monkey_patch() # モンキーパッチを当てる前のオリジナルのモジュールを取り出す from eventlet import patcher orig_threading = patcher.original('threading') orig_time = patcher.original('time') def greenthread1(): while True: print('Green thread: 1') time.sleep(1) def greenthread2(): while True: print('Green thread: 2') time.sleep(1) def nativethread1(): while True: print('Native thread: 1') orig_time.sleep(3) def native_spawn(): native_thread = orig_threading.Thread(target=nativethread1) native_thread.start() if __name__ == '__main__': eventlet.spawn(greenthread1) eventlet.spawn(greenthread2) native_spawn() while True: time.sleep(1)
Native thread: 1Green thread: 1 Green thread: 2 Green thread: 1 Green thread: 2 Green thread: 1 Green thread: 2 Native thread: 1 Green thread: 1 Green thread: 2 Green thread: 1 Green thread: 2 Green thread: 1 Green thread: 2 Native thread: 1
コンソールが (ネイティブ) スレッドセーフではないため、一部表示が崩れているが、なんにせよ本来意図していたようにネイティブスレッドは 3 秒間隔、グリーンスレッドは 1 秒間隔で表示を続けている。
ネイティブスレッドがブロックする処理はグリーンスレッドが動作しているのとは別のネイティブスレッドで実行すれば良いことがわかった。 次はグリーンスレッドとネイティブスレッドの間でネイティブスレッドをブロックするようなジョブの受け渡しをしてみる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import eventlet eventlet.monkey_patch() import time import random from eventlet.event import Event # モンキーパッチを当てる前のオリジナルのモジュールを取り出す from eventlet import patcher orig_threading = patcher.original('threading') orig_time = patcher.original('time') orig_Queue = patcher.original('Queue') # グリーンスレッド・ネイティブスレッド間で仕事の要求・結果を受け渡しするためのキュー _REQUEST_Q = orig_Queue.Queue(maxsize=-1) _RESPONSE_Q = orig_Queue.Queue(maxsize=-1) # ジョブをネイティブスレッドに依頼するグリーンスレッド def put_green_thread(): while True: e = Event() # 1 から 3 秒の間でランダムにネイティブスレッドをブロックするジョブを作る job = (e, blocking_native_thread_job, random.randint(1, 3)) # ジョブを登録する _REQUEST_Q.put(job) # ジョブの完了を待つ result = e.wait() # 結果を表示する print(result) # ネイティブスレッドで処理が完了したジョブを取り出すグリーンスレッド def get_green_thread(): while True: while not _RESPONSE_Q.empty(): try: # 処理済みのジョブはノンブロックで取り出す (重要) e, result = _RESPONSE_Q.get(block=False) except orig_Queue.Empty: pass # ジョブが完了したことを待っているグリーンスレッドに通知する e.send(result) # グリーンスレッドを切り替える time.sleep(0) # C 拡張モジュール内で行われるような、ネイティブスレッドをブロックする仕事 def blocking_native_thread_job(block_time): # 一定時間ネイティブスレッドをブロックする orig_time.sleep(block_time) # 処理結果としてメッセージを返す return 'Done, blocked %d sec' % block_time # ジョブを処理するネイティブスレッド def native_thread(): while True: # ジョブをキューから取り出す e, job, args = _REQUEST_Q.get(block=True) # ジョブをネイティブスレッドで処理する result = job(args) # 処理結果をキューに詰める _RESPONSE_Q.put((e, result)) def native_spawn(): thread = orig_threading.Thread(target=native_thread) thread.start() def print_green_thread(): while True: print('Green thread') time.sleep(1) if __name__ == '__main__': eventlet.spawn(put_green_thread) eventlet.spawn(get_green_thread) eventlet.spawn(print_green_thread) native_spawn() while True: time.sleep(1)
Green thread Green thread Done, blocked 1 sec Green thread Green thread Green thread Done, blocked 3 sec Green thread Done, blocked 1 sec
上手く動いてそう。
ただ、上記のコードは正直言ってかなり面倒くさい。 実は同様のことを実現する機能は既に Eventlet で用意されている。 eventlet.tpool を使うことでネイティブスレッドのスレッドプールを経由して処理を実行できる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import eventlet from eventlet import tpool eventlet.monkey_patch() # モンキーパッチを当てる前のオリジナルのモジュールを取り出す from eventlet import patcher orig_time = patcher.original('time') import time import random # ジョブをネイティブスレッド (プール) に依頼するグリーンスレッド def put_green_thread(): while True: result = tpool.execute(blocking_native_thread_job, random.randint(1, 3)) # 結果を表示する print(result) def print_green_thread(): while True: print('Green thread') time.sleep(1) # C 拡張モジュール内で行われるような、ネイティブスレッドをブロックする仕事 def blocking_native_thread_job(block_time): # 一定時間ネイティブスレッドをブロックする orig_time.sleep(block_time) # 処理結果としてメッセージを返す return 'Done, blocked %d sec' % block_time if __name__ == '__main__': eventlet.spawn(put_green_thread) eventlet.spawn(print_green_thread) while True: time.sleep(1)
前述したコードよりも、自分で書いていない分、だいぶスッキリした。
実行してみる。
Green thread Green thread Green thread Done, blocked 3 sec Green thread Green thread Done, blocked 1 sec Green thread
よさそう。めでたしめでたし。
ちなみに上記の tpool を使って RDB との接続を処理できるようにした db_pool というモジュールも Eventlet には用意されている。 RDB のドライバが Pure Python でないときは、これを使えばよさそうだ。