lumachina/sim.py
2024-07-12 12:58:10 +02:00

168 lines
4.7 KiB
Python

# This program assumes a tick interval of 1 minute. That is, the events are
# made to evolve on a one dimensional time grid of 1 minute resolution.
import os
import multiprocessing as mp
import typing as T
import numpy as np
DEFAULT_SEED = 1312
DEFAULT_PARALLELISM = mp.cpu_count()
DEFAULT_RUNS = 1000
class Simulation(object):
def __init__(
self,
queue: mp.Queue,
runs: int,
generator: np.random.Generator,
snail_len: float,
wheel_len: float,
wheel_dist: float,
road_len: float,
snail_speed: float,
) -> None:
self.queue = queue
self.runs = runs
self.generator = generator
self.snail_len = snail_len / road_len
self.wheel_len = wheel_len / road_len
self.wheel_dist = wheel_dist / road_len
self.starting_wheel_pos_mul = (road_len - 2 * wheel_len - wheel_dist) / road_len
self.snail_speed = snail_speed / road_len
def is_car_passing(self) -> bool:
"""
The probability a car is passing every minute is assumed to be
uniformly distributed in the hour. Having 10 cars/hour, and a tick
interval of 1 minute we have:
P_car = 10/60 ~ 0.1666666
"""
return self.generator.random() <= 1 / 6
def is_dead(self, x: float) -> bool:
"""
The probability of the car hitting the snail being at x is the
probability of any of the pieces of the two wheels to be in the
interval [x-snail_len, x].
| |
|-------------------------------------------------------| road_len
|------- x |
| @@@ |
|---- x-snail_len |
| |
| |
|---------- x_wheel_pos |
| XXXXXXX--(wheel_dist)--XXXXXXX |
| ------- wheel_len ------- |
| |
"""
x_wheel_pos = self.generator.random() * self.starting_wheel_pos_mul
first_wheel_hit = self.intersect(x, x_wheel_pos)
second_wheel_hit = self.intersect(x, x_wheel_pos + self.wheel_dist)
return first_wheel_hit or second_wheel_hit
def intersect(self, x_snail: float, x_wheel: float) -> bool:
return (x_wheel > x_snail - self.snail_len) and (x_wheel < x_snail)
def _run(self) -> bool:
"""
This is the entrypoint of the single run of a simulation. The result is
a boolean indicating whether the snail did survive the road crossing.
The simulation ends if the snail manages to run all the
road_len+snail_len distance.
"""
win_dist = 1 + self.snail_len
x = 0
while x < win_dist:
if self.is_car_passing() and self.is_dead(x):
return False
x += self.snail_speed
return True
def run(self) -> None:
for i in range(self.runs):
self.queue.put(self._run())
self.queue.close()
def clear():
os.system("cls" if os.name == "nt" else "clear")
def print_result(runs: int, survived: int) -> None:
print(f"total runs: {runs}")
print(f"result: {survived/runs}")
def gather(q: mp.Queue()) -> None:
runs = 0
survived = 0
while True:
data = q.get()
if data is None:
break
runs += 1
if data:
survived += 1
print("============= FINAL RESULT ============")
print_result(runs, survived)
if __name__ == "__main__":
seed = int(os.getenv("SEED") or DEFAULT_SEED)
procs = int(os.getenv("PARALLELISM") or DEFAULT_PARALLELISM)
runs = int(os.getenv("RUNS") or DEFAULT_RUNS)
q = mp.Queue()
seq = np.random.default_rng(int(seed))
rng_stream = seq.spawn(int(procs))
snail_len = 5 # 5 cm
wheel_len = 20 # 20 cm
wheel_dist = 150 # 1.5 m
road_len = 500 # 5 m
snail_speed = 1 # 1 cm/min
sims = [
Simulation(
q,
runs,
rng,
snail_len,
wheel_len,
wheel_dist,
road_len,
snail_speed,
)
for rng in rng_stream
]
processes: T.List[mp.Process] = [mp.Process(target=s.run, args=()) for s in sims]
collector = mp.Process(target=gather, args=(q,))
collector.start()
for p in processes:
p.start()
for p in processes:
p.join()
print(f"process finished: {p}")
q.put(None)
collector.join()