#!/usr/bin/env python3 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """Instrument visualmetrics.py to run in parallel.""" import argparse import json import os import subprocess import sys import tarfile from concurrent.futures import ProcessPoolExecutor from functools import partial from multiprocessing import cpu_count from pathlib import Path import attr import structlog from jsonschema import validate from voluptuous import ALLOW_EXTRA, Required, Schema #: The directory where artifacts from this job will be placed. OUTPUT_DIR = Path("/", "builds", "worker", "artifacts") #: A job to process through visualmetrics.py @attr.s class Job: #: The name of the test. test_name = attr.ib(type=str) #: json_path: The path to the ``browsertime.json`` file on disk. json_path = attr.ib(type=Path) #: video_path: The path of the video file on disk. video_path = attr.ib(type=Path) #: The schema for validating jobs. JOB_SCHEMA = Schema( { Required("jobs"): [ {Required("test_name"): str, Required("browsertime_json_path"): str} ], Required("application"): {Required("name"): str, "version": str}, } ) #: A partial schema for browsertime.json files. BROWSERTIME_SCHEMA = Schema( [{Required("files"): {Required("video"): [str]}}], extra=ALLOW_EXTRA ) with Path("/", "builds", "worker", "performance-artifact-schema.json").open() as f: PERFHERDER_SCHEMA = json.loads(f.read()) def run_command(log, cmd): """Run a command using subprocess.check_output Args: log: The structlog logger instance. cmd: the command to run as a list of strings. Returns: A tuple of the process' exit status and standard output. """ log.info("Running command", cmd=cmd) try: res = subprocess.check_output(cmd) log.info("Command succeeded", result=res) return 0, res except subprocess.CalledProcessError as e: log.info("Command failed", cmd=cmd, status=e.returncode, output=e.output) return e.returncode, e.output def append_result(log, suites, test_name, name, result): """Appends a ``name`` metrics result in the ``test_name`` suite. Args: log: The structlog logger instance. suites: A mapping containing the suites. test_name: The name of the test. name: The name of the metrics. result: The value to append. """ if name.endswith("Progress"): return try: result = int(result) except ValueError: log.error("Could not convert value", name=name) log.error("%s" % result) result = 0 if test_name not in suites: suites[test_name] = {"name": test_name, "subtests": {}} subtests = suites[test_name]["subtests"] if name not in subtests: subtests[name] = { "name": name, "replicates": [result], "lowerIsBetter": True, "unit": "ms", } else: subtests[name]["replicates"].append(result) def compute_median(subtest): """Adds in the subtest the ``value`` field, which is the average of all replicates. Args: subtest: The subtest containing all replicates. Returns: The subtest. """ if "replicates" not in subtest: return subtest series = subtest["replicates"][1:] subtest["value"] = float(sum(series)) / float(len(series)) return subtest def get_suite(suite): """Returns the suite with computed medians in its subtests. Args: suite: The suite to convert. Returns: The suite. """ suite["subtests"] = [ compute_median(subtest) for subtest in suite["subtests"].values() ] return suite def read_json(json_path, schema): """Read the given json file and verify against the provided schema. Args: json_path: Path of json file to parse. schema: A callable to validate the JSON's schema. Returns: The contents of the file at ``json_path`` interpreted as JSON. """ try: with open(str(json_path), "r") as f: data = json.load(f) except Exception: log.error("Could not read JSON file", path=json_path, exc_info=True) raise log.info("Loaded JSON from file", path=json_path, read_json=data) try: schema(data) except Exception: log.error("JSON failed to validate", exc_info=True) raise return data def main(log, args): """Run visualmetrics.py in parallel. Args: log: The structlog logger instance. args: The parsed arguments from the argument parser. Returns: The return code that the program will exit with. """ fetch_dir = os.getenv("MOZ_FETCHES_DIR") if not fetch_dir: log.error("Expected MOZ_FETCHES_DIR environment variable.") return 1 fetch_dir = Path(fetch_dir) visualmetrics_path = fetch_dir / "visualmetrics.py" if not visualmetrics_path.exists(): log.error( "Could not locate visualmetrics.py", expected_path=str(visualmetrics_path) ) return 1 browsertime_results_path = fetch_dir / "browsertime-results.tgz" try: with tarfile.open(str(browsertime_results_path)) as tar: tar.extractall(path=str(fetch_dir)) except Exception: log.error( "Could not read extract browsertime results archive", path=browsertime_results_path, exc_info=True, ) return 1 log.info("Extracted browsertime results", path=browsertime_results_path) try: jobs_json_path = fetch_dir / "browsertime-results" / "jobs.json" jobs_json = read_json(jobs_json_path, JOB_SCHEMA) except Exception: return 1 jobs = [] for job in jobs_json["jobs"]: browsertime_json_path = fetch_dir / job["browsertime_json_path"] try: browsertime_json = read_json(browsertime_json_path, BROWSERTIME_SCHEMA) except Exception: return 1 for site in browsertime_json: for video in site["files"]["video"]: jobs.append( Job( test_name=job["test_name"], json_path=browsertime_json_path, video_path=browsertime_json_path.parent / video, ) ) failed_runs = 0 suites = {} with ProcessPoolExecutor(max_workers=cpu_count()) as executor: for job, result in zip( jobs, executor.map( partial( run_visual_metrics, visualmetrics_path=visualmetrics_path, options=args.visual_metrics_options, ), jobs, ), ): returncode, res = result if returncode != 0: log.error( "Failed to run visualmetrics.py", video_location=job.video_location, error=res, ) failed_runs += 1 else: # Python 3.5 requires a str object (not 3.6+) res = json.loads(res.decode("utf8")) for name, value in res.items(): append_result(log, suites, job.test_name, name, value) suites = [get_suite(suite) for suite in suites.values()] perf_data = { "framework": {"name": "browsertime"}, "application": jobs_json["application"], "type": "vismet", "suites": suites, } # Validates the perf data complies with perfherder schema. # The perfherder schema uses jsonschema so we can't use voluptuous here. validate(perf_data, PERFHERDER_SCHEMA) raw_perf_data = json.dumps(perf_data) with Path(OUTPUT_DIR, "perfherder-data.json").open("w") as f: f.write(raw_perf_data) # Prints the data in logs for Perfherder to pick it up. log.info("PERFHERDER_DATA: %s" % raw_perf_data) # Lists the number of processed jobs, failures, and successes. with Path(OUTPUT_DIR, "summary.json").open("w") as f: json.dump( { "total_jobs": len(jobs), "successful_runs": len(jobs) - failed_runs, "failed_runs": failed_runs, }, f, ) # If there's one failure along the way, we want to return > 0 # to trigger a red job in TC. return failed_runs def run_visual_metrics(job, visualmetrics_path, options): """Run visualmetrics.py on the input job. Returns: A returncode and a string containing the output of visualmetrics.py """ cmd = ["/usr/bin/python", str(visualmetrics_path), "--video", str(job.video_path)] cmd.extend(options) return run_command(log, cmd) if __name__ == "__main__": structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.format_exc_info, structlog.dev.ConsoleRenderer(colors=False), ], cache_logger_on_first_use=True, ) parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( "visual_metrics_options", type=str, metavar="VISUAL-METRICS-OPTIONS", help="Options to pass to visualmetrics.py", nargs="*", ) args = parser.parse_args() log = structlog.get_logger() try: sys.exit(main(log, args)) except Exception as e: log.error("Unhandled exception: %s" % e, exc_info=True) sys.exit(1)