168 lines
4.7 KiB
Python
168 lines
4.7 KiB
Python
# This program assumes a tick interval of 1 minute. That is, the events are
|
|
# made to evolve on a one dimensional time grid of 1 minute resolution.
|
|
|
|
import os
|
|
import multiprocessing as mp
|
|
import typing as T
|
|
|
|
import numpy as np
|
|
|
|
|
|
DEFAULT_SEED = 1312
|
|
DEFAULT_PARALLELISM = mp.cpu_count()
|
|
DEFAULT_RUNS = 1000
|
|
|
|
|
|
class Simulation(object):
|
|
def __init__(
|
|
self,
|
|
queue: mp.Queue,
|
|
runs: int,
|
|
generator: np.random.Generator,
|
|
snail_len: float,
|
|
wheel_len: float,
|
|
wheel_dist: float,
|
|
road_len: float,
|
|
snail_speed: float,
|
|
) -> None:
|
|
self.queue = queue
|
|
self.runs = runs
|
|
self.generator = generator
|
|
self.snail_len = snail_len / road_len
|
|
self.wheel_len = wheel_len / road_len
|
|
self.wheel_dist = wheel_dist / road_len
|
|
self.starting_wheel_pos_mul = (road_len - 2 * wheel_len - wheel_dist) / road_len
|
|
self.snail_speed = snail_speed / road_len
|
|
|
|
def is_car_passing(self) -> bool:
|
|
"""
|
|
The probability a car is passing every minute is assumed to be
|
|
uniformly distributed in the hour. Having 10 cars/hour, and a tick
|
|
interval of 1 minute we have:
|
|
|
|
P_car = 10/60 ~ 0.1666666
|
|
"""
|
|
return self.generator.random() <= 1 / 6
|
|
|
|
def is_dead(self, x: float) -> bool:
|
|
"""
|
|
The probability of the car hitting the snail being at x is the
|
|
probability of any of the pieces of the two wheels to be in the
|
|
interval [x-snail_len, x].
|
|
|
|
| |
|
|
|-------------------------------------------------------| road_len
|
|
|------- x |
|
|
| @@@ |
|
|
|---- x-snail_len |
|
|
| |
|
|
| |
|
|
|---------- x_wheel_pos |
|
|
| XXXXXXX--(wheel_dist)--XXXXXXX |
|
|
| ------- wheel_len ------- |
|
|
| |
|
|
"""
|
|
|
|
x_wheel_pos = self.generator.random() * self.starting_wheel_pos_mul
|
|
first_wheel_hit = self.intersect(x, x_wheel_pos)
|
|
second_wheel_hit = self.intersect(x, x_wheel_pos + self.wheel_dist)
|
|
|
|
return first_wheel_hit or second_wheel_hit
|
|
|
|
def intersect(self, x_snail: float, x_wheel: float) -> bool:
|
|
return (x_wheel > x_snail - self.snail_len) and (x_wheel < x_snail)
|
|
|
|
def _run(self) -> bool:
|
|
"""
|
|
This is the entrypoint of the single run of a simulation. The result is
|
|
a boolean indicating whether the snail did survive the road crossing.
|
|
|
|
The simulation ends if the snail manages to run all the
|
|
road_len+snail_len distance.
|
|
"""
|
|
|
|
win_dist = 1 + self.snail_len
|
|
x = 0
|
|
while x < win_dist:
|
|
if self.is_car_passing() and self.is_dead(x):
|
|
return False
|
|
|
|
x += self.snail_speed
|
|
|
|
return True
|
|
|
|
def run(self) -> None:
|
|
for i in range(self.runs):
|
|
self.queue.put(self._run())
|
|
self.queue.close()
|
|
|
|
|
|
def clear():
|
|
os.system("cls" if os.name == "nt" else "clear")
|
|
|
|
|
|
def print_result(runs: int, survived: int) -> None:
|
|
print(f"total runs: {runs}")
|
|
print(f"result: {survived/runs}")
|
|
|
|
|
|
def gather(q: mp.Queue()) -> None:
|
|
runs = 0
|
|
survived = 0
|
|
while True:
|
|
data = q.get()
|
|
if data is None:
|
|
break
|
|
runs += 1
|
|
if data:
|
|
survived += 1
|
|
|
|
print("============= FINAL RESULT ============")
|
|
print_result(runs, survived)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
seed = int(os.getenv("SEED") or DEFAULT_SEED)
|
|
procs = int(os.getenv("PARALLELISM") or DEFAULT_PARALLELISM)
|
|
runs = int(os.getenv("RUNS") or DEFAULT_RUNS)
|
|
|
|
q = mp.Queue()
|
|
|
|
seq = np.random.default_rng(int(seed))
|
|
rng_stream = seq.spawn(int(procs))
|
|
|
|
snail_len = 5 # 5 cm
|
|
wheel_len = 20 # 20 cm
|
|
wheel_dist = 150 # 1.5 m
|
|
road_len = 500 # 5 m
|
|
snail_speed = 1 # 1 cm/min
|
|
|
|
sims = [
|
|
Simulation(
|
|
q,
|
|
runs,
|
|
rng,
|
|
snail_len,
|
|
wheel_len,
|
|
wheel_dist,
|
|
road_len,
|
|
snail_speed,
|
|
)
|
|
for rng in rng_stream
|
|
]
|
|
|
|
processes: T.List[mp.Process] = [mp.Process(target=s.run, args=()) for s in sims]
|
|
|
|
collector = mp.Process(target=gather, args=(q,))
|
|
collector.start()
|
|
|
|
for p in processes:
|
|
p.start()
|
|
|
|
for p in processes:
|
|
p.join()
|
|
print(f"process finished: {p}")
|
|
|
|
q.put(None)
|
|
collector.join()
|