# This program assumes a tick interval of 1 minute. That is, the events are # made to evolve on a one dimensional time grid of 1 minute resolution. import os import multiprocessing as mp import typing as T import numpy as np DEFAULT_SEED = 1312 DEFAULT_PARALLELISM = mp.cpu_count() DEFAULT_RUNS = 1000 class Simulation(object): def __init__( self, queue: mp.Queue, runs: int, generator: np.random.Generator, snail_len: float, wheel_len: float, wheel_dist: float, road_len: float, snail_speed: float, ) -> None: self.queue = queue self.runs = runs self.generator = generator self.snail_len = snail_len / road_len self.wheel_len = wheel_len / road_len self.wheel_dist = wheel_dist / road_len self.starting_wheel_pos_mul = (road_len - 2 * wheel_len - wheel_dist) / road_len self.snail_speed = snail_speed / road_len def is_car_passing(self) -> bool: """ The probability a car is passing every minute is assumed to be uniformly distributed in the hour. Having 10 cars/hour, and a tick interval of 1 minute we have: P_car = 10/60 ~ 0.1666666 """ return self.generator.random() <= 1 / 6 def is_dead(self, x: float) -> bool: """ The probability of the car hitting the snail being at x is the probability of any of the pieces of the two wheels to be in the interval [x-snail_len, x]. | | |-------------------------------------------------------| road_len |------- x | | @@@ | |---- x-snail_len | | | | | |---------- x_wheel_pos | | XXXXXXX--(wheel_dist)--XXXXXXX | | ------- wheel_len ------- | | | """ x_wheel_pos = self.generator.random() * self.starting_wheel_pos_mul first_wheel_hit = self.intersect(x, x_wheel_pos) second_wheel_hit = self.intersect(x, x_wheel_pos + self.wheel_dist) return first_wheel_hit or second_wheel_hit def intersect(self, x_snail: float, x_wheel: float) -> bool: return (x_wheel > x_snail - self.snail_len) and (x_wheel < x_snail) def _run(self) -> bool: """ This is the entrypoint of the single run of a simulation. The result is a boolean indicating whether the snail did survive the road crossing. The simulation ends if the snail manages to run all the road_len+snail_len distance. """ win_dist = 1 + self.snail_len x = 0 while x < win_dist: if self.is_car_passing() and self.is_dead(x): return False x += self.snail_speed return True def run(self) -> None: for i in range(self.runs): self.queue.put(self._run()) self.queue.close() def clear(): os.system("cls" if os.name == "nt" else "clear") def print_result(runs: int, survived: int) -> None: print(f"total runs: {runs}") print(f"result: {survived/runs}") def gather(q: mp.Queue()) -> None: runs = 0 survived = 0 while True: data = q.get() if data is None: break runs += 1 if data: survived += 1 print("============= FINAL RESULT ============") print_result(runs, survived) if __name__ == "__main__": seed = int(os.getenv("SEED") or DEFAULT_SEED) procs = int(os.getenv("PARALLELISM") or DEFAULT_PARALLELISM) runs = int(os.getenv("RUNS") or DEFAULT_RUNS) q = mp.Queue() seq = np.random.default_rng(int(seed)) rng_stream = seq.spawn(int(procs)) snail_len = 5 # 5 cm wheel_len = 20 # 20 cm wheel_dist = 150 # 1.5 m road_len = 500 # 5 m snail_speed = 1 # 1 cm/min sims = [ Simulation( q, runs, rng, snail_len, wheel_len, wheel_dist, road_len, snail_speed, ) for rng in rng_stream ] processes: T.List[mp.Process] = [mp.Process(target=s.run, args=()) for s in sims] collector = mp.Process(target=gather, args=(q,)) collector.start() for p in processes: p.start() for p in processes: p.join() print(f"process finished: {p}") q.put(None) collector.join()