多執行緒生成#

四個核心分佈(randomstandard_normalstandard_exponentialstandard_gamma)都允許使用 out 關鍵字引數來填滿現有的陣列。現有的陣列需要是連續且行為良好的(可寫入和對齊)。在正常情況下,使用常見的建構子(例如 numpy.empty)建立的陣列將滿足這些要求。

此範例使用 Python 3 的 concurrent.futures 來使用多個執行緒填滿陣列。執行緒是長生命週期的,因此重複呼叫不需要來自執行緒建立的任何額外負擔。

產生的隨機數是可重現的,因為在執行緒數量不變的情況下,相同的種子會產生相同的輸出。

from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np

class MultithreadedRNG:
    def __init__(self, n, seed=None, threads=None):
        if threads is None:
            threads = multiprocessing.cpu_count()
        self.threads = threads

        seq = SeedSequence(seed)
        self._random_generators = [default_rng(s)
                                   for s in seq.spawn(threads)]

        self.n = n
        self.executor = concurrent.futures.ThreadPoolExecutor(threads)
        self.values = np.empty(n)
        self.step = np.ceil(n / threads).astype(np.int_)

    def fill(self):
        def _fill(random_state, out, first, last):
            random_state.standard_normal(out=out[first:last])

        futures = {}
        for i in range(self.threads):
            args = (_fill,
                    self._random_generators[i],
                    self.values,
                    i * self.step,
                    (i + 1) * self.step)
            futures[self.executor.submit(*args)] = i
        concurrent.futures.wait(futures)

    def __del__(self):
        self.executor.shutdown(False)

多執行緒隨機數生成器可用於填滿陣列。values 屬性顯示填滿前的零值和填滿後的隨機值。

In [2]: mrng = MultithreadedRNG(10000000, seed=12345)
   ...: print(mrng.values[-1])
Out[2]: 0.0

In [3]: mrng.fill()
   ...: print(mrng.values[-1])
Out[3]: 2.4545724517479104

可以使用多個執行緒產生的時間與使用單一執行緒生成所需的時間進行比較。

In [4]: print(mrng.threads)
   ...: %timeit mrng.fill()

Out[4]: 4
   ...: 32.8 ms ± 2.71 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

單執行緒呼叫直接使用 BitGenerator。

In [5]: values = np.empty(10000000)
   ...: rg = default_rng()
   ...: %timeit rg.standard_normal(out=values)

Out[5]: 99.6 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

即使對於僅為中等大小的陣列,增益也相當顯著,且擴展是合理的。由於陣列建立的額外負擔,與不使用現有陣列的呼叫相比,增益甚至更大。

In [6]: rg = default_rng()
   ...: %timeit rg.standard_normal(10000000)

Out[6]: 125 ms ± 309 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

請注意,如果使用者未設定 threads,則將由 multiprocessing.cpu_count() 確定。

In [7]: # simulate the behavior for `threads=None`, if the machine had only one thread
   ...: mrng = MultithreadedRNG(10000000, seed=12345, threads=1)
   ...: print(mrng.values[-1])
Out[7]: 1.1800150052158556