diff --git a/config/multithread b/config/multithread index 0440964876..dcc8911be4 100644 --- a/config/multithread +++ b/config/multithread @@ -30,6 +30,7 @@ start_multithread_build() { # When building addons, don't halt on error - keep building all packages/addons [ "${MTADDONBUILD}" = "yes" ] && buildopts+=" --continue-on-error" || buildopts+=" --halt-on-error" + [ "${MTPROGRESS}" = "yes" ] && buildopts+=" --progress" [ "${MTVERBOSE}" = "yes" ] && buildopts+=" --verbose" [ "${MTDEBUG}" = "yes" ] && buildopts+=" --debug" if [ "${DISABLE_COLORS}" = "yes" ]; then diff --git a/scripts/pkgbuilder.py b/scripts/pkgbuilder.py index be12d8a0ac..372787043a 100755 --- a/scripts/pkgbuilder.py +++ b/scripts/pkgbuilder.py @@ -14,6 +14,8 @@ import threading import queue import subprocess import multiprocessing +import signal +import fcntl, termios, struct # Ensure we can output any old crap to stdout and stderr sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach()) @@ -33,9 +35,10 @@ class RusagePopen(subprocess.Popen): self.rusage = ru return (pid, sts) -def rusage_run(*popenargs, timeout=None, **kwargs): +def rusage_run(*popenargs, parent=None, timeout=None, **kwargs): with RusagePopen(*popenargs, **kwargs) as process: try: + parent.child = process stdout, stderr = process.communicate(None, timeout=timeout) except subprocess.TimeoutExpired as exc: process.kill() @@ -47,6 +50,7 @@ def rusage_run(*popenargs, timeout=None, **kwargs): retcode = process.poll() res = subprocess.CompletedProcess(process.args, retcode, stdout, stderr) res.rusage = process.rusage + parent.child = None return res class GeneratorEmpty(Exception): @@ -206,6 +210,9 @@ class Generator: for name in self.failed: yield self.failed[name] + def completedJobCount(self): + return len(self.built) + def totalJobCount(self): return self.totalJobs @@ -255,11 +262,19 @@ class BuildProcess(threading.Thread): self.active = False + self.child = None + self.stopping = False def stop(self): self.stopping = True self.work.put(None) + if self.child: + try: + os.killpg(os.getpgid(self.child.pid), signal.SIGTERM) + self.child.wait() + except: + pass def isActive(self): return self.active == True @@ -305,14 +320,14 @@ class BuildProcess(threading.Thread): with open(job["logfile"], "w") as logfile: cmd = rusage_run(job["args"], cwd=ROOT, stdin=subprocess.PIPE, stdout=logfile, stderr=subprocess.STDOUT, - universal_newlines=True, shell=False) + universal_newlines=True, shell=False, parent=self, start_new_session=True) returncode = cmd.returncode job["cmdproc"] = cmd else: try: cmd = rusage_run(job["args"], cwd=ROOT, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - universal_newlines=True, shell=False, + universal_newlines=True, shell=False, parent=self, start_new_session=True, encoding="utf-8", errors="replace") returncode = cmd.returncode job["cmdproc"] = cmd @@ -340,7 +355,7 @@ class BuildProcess(threading.Thread): class Builder: def __init__(self, maxthreadcount, inputfilename, jobglog, loadstats, stats_interval, \ haltonerror=True, failimmediately=True, log_burst=True, log_combine="always", \ - autoremove=False, bookends=True, colors=False, debug=False, verbose=False): + bookends=True, autoremove=False, colors=False, progress=False, debug=False, verbose=False): if inputfilename == "-": plan = json.load(sys.stdin) else: @@ -349,6 +364,24 @@ class Builder: self.generator = Generator(plan) + self.jobtotal = self.generator.totalJobCount() + self.twidth = len("%d" % self.jobtotal) + + # parse threadcount + if maxthreadcount.endswith("%"): + self.threadcount = int(multiprocessing.cpu_count() / 100 * int(args.max_procs.replace("%",""))) + else: + if args.max_procs == "0": + self.threadcount = 256 + else: + self.threadcount = int(maxthreadcount) + + self.threadcount = min(self.jobtotal, self.threadcount) + self.threadcount = max(1, self.threadcount) + + if args.debug: + DEBUG("THREADCOUNT#: input arg: %s, computed: %d" % (maxthreadcount, self.threadcount)) + self.joblog = jobglog self.loadstats = loadstats self.stats_interval = int(stats_interval) @@ -364,6 +397,36 @@ class Builder: self.bookends = bookends self.autoremove = autoremove + self.stdout_dirty = False + self.stderr_dirty = False + self.progress_dirty = False + + self.joblogfile = None + self.loadstatsfile = None + self.nextstats = 0 + self.build_start = 0 + + self.work = queue.Queue() + self.complete = queue.Queue() + self.processes = [] + + # Init all processes + self.processes = [] + for i in range(1, self.threadcount + 1): + self.processes.append(BuildProcess(i, self.threadcount, self.jobtotal, self.haltonerror, self.work, self.complete)) + + # work and completion sequences + self.wseq = 0 + self.cseq = 0 + + self.progress = progress and sys.stderr.isatty() + self.progress_glitch_fix = "" + self.rows = self.columns = -1 + self.original_resize_handler = None + if self.progress: + self.getTerminalSize() + self.resize_handler = signal.signal(signal.SIGWINCH, self.getTerminalSize) + self.colors = (colors == "always" or (colors == "auto" and sys.stderr.isatty())) self.color_code = {} self.color_code["DONE"] = "\033[0;32m" #green @@ -373,43 +436,6 @@ class Builder: self.color_code["INIT"] = "\033[0;36m" #cyan self.color_code["WAIT"] = "\033[0;35m" #magenta - self.work = queue.Queue() - self.complete = queue.Queue() - - self.jobtotal = self.generator.totalJobCount() - self.twidth = len("%d" % self.jobtotal) - - self.joblogfile = None - self.loadstatsfile = None - self.nextstats = 0 - - self.build_start = 0 - - # work and completion sequences - self.cseq = 0 - self.wseq = 0 - - # parse threadcount - if maxthreadcount.endswith("%"): - self.threadcount = int(multiprocessing.cpu_count() / 100 * int(args.max_procs.replace("%",""))) - else: - if args.max_procs == "0": - self.threadcount = 256 - else: - self.threadcount = int(maxthreadcount) - - self.threadcount = 1 if self.threadcount < 1 else self.threadcount - self.threadcount = min(self.jobtotal, self.threadcount) - self.threadcount = max(1, self.threadcount) - - if args.debug: - DEBUG("THREADCOUNT#: input arg: %s, computed: %d" % (maxthreadcount, self.threadcount)) - - # Init all processes - self.processes = [] - for i in range(1, self.threadcount + 1): - self.processes.append(BuildProcess(i, self.threadcount, self.jobtotal, haltonerror, self.work, self.complete)) - def build(self): if self.joblog: self.joblogfile = open(self.joblog, "w") @@ -417,10 +443,11 @@ class Builder: if self.loadstats: self.loadstatsfile = open(self.loadstats, "w") - self.startProcesses() - self.build_start = time.time() + for process in self.processes: + process.start() + # Queue new work until no more work is available, and all queued jobs have completed. while self.queueWork(): job = self.getCompletedJob() @@ -434,7 +461,6 @@ class Builder: job = None self.captureStats(finished=True) - self.stopProcesses() if self.joblogfile: self.joblogfile.close() @@ -446,11 +472,10 @@ class Builder: if self.haltonerror and not self.failimmediately: failed = [job for job in self.generator.failedJobs() if job["logfile"]] if failed != []: - print("\nThe following log(s) for this failure are available:", file=sys.stdout) + self.oprint("\nThe following log(s) for this failure are available:") for job in failed: - print(" %s => %s" % (job["name"], job["logfile"]), file=sys.stdout) - print("", file=sys.stdout) - sys.stdout.flush() + self.oprint(" %s => %s" % (job["name"], job["logfile"])) + self.oprint("", flush=True) return False return True @@ -466,7 +491,7 @@ class Builder: if self.haltonerror and self.generator.failedJobCount() != 0: if not self.failimmediately and self.generator.activeJobCount() != 0: freeslots = self.threadcount - self.generator.activeJobCount() - self.vprint("WAIT", "waiting", ", ".join(self.generator.activeJobNames())) + self.show_status("WAIT", "waiting", ", ".join(self.generator.activeJobNames())) DEBUG("Waiting for : %d active, %d idle [%s]" % (self.generator.activeJobCount(), freeslots, ", ".join(self.generator.activeJobNames()))) return True else: @@ -477,7 +502,7 @@ class Builder: job = self.generator.getNextJob() if self.verbose: - self.vprint("INIT", "submit", job["name"]) + self.show_status("INIT", "submit", job["name"]) if self.debug: DEBUG("Queueing Job: %s %s" % (job["task"], job["name"])) @@ -490,7 +515,7 @@ class Builder: self.work.put(job) if self.verbose: - self.vprint("ACTV", "active", ", ".join(self.generator.activeJobNames())) + self.show_status("ACTV", "active", ", ".join(self.generator.activeJobNames())) if self.debug: freeslots = self.threadcount - self.generator.activeJobCount() @@ -502,8 +527,8 @@ class Builder: pending = [] for (i, (package, wants)) in enumerate(self.generator.getStallInfo()): pending.append("%s (wants: %s)" % (package, ", ".join(wants))) - self.vprint("ACTV", "active", ", ".join(self.generator.activeJobNames())) - self.vprint("IDLE", "stalled", "; ".join(pending), p1=len(pending)) + self.show_status("ACTV", "active", ", ".join(self.generator.activeJobNames())) + self.show_status("IDLE", "stalled", "; ".join(pending), p1=len(pending)) if self.debug: freeslots = self.threadcount - self.generator.activeJobCount() @@ -543,43 +568,99 @@ class Builder: pass def captureStats(self, finished=False): + self.displayProgress() + if finished: + self.clearProgress() + self.flush() + if not self.loadstatsfile: - return None + if self.progress: + now = time.time() + return int(now + 1) - now + else: + return None now = time.time() if now >= self.nextstats or finished: self.nextstats = int(now - (now % self.stats_interval)) + self.stats_interval - loadavg = open("/proc/loadavg", "r").readline().split() + loadavg = self.getLoad() procs = loadavg[3].split("/") - meminfo = dict((i.split()[0].rstrip(':'),int(i.split()[1])) for i in open("/proc/meminfo", "r").readlines()) + meminfo = self.getMemory() print("%d %06d %5s %5s %5s %3s %4s %9d %2d %s" % (now, now - self.build_start, \ loadavg[0], loadavg[1], loadavg[2], procs[0], procs[1], meminfo["MemAvailable"], \ self.generator.activeJobCount(), ",".join(self.generator.activeJobNames())), \ file=self.loadstatsfile, flush=True) - return (self.nextstats - now) + if self.progress: + return min((self.nextstats - now), int(now + 1) - now) + else: + return (self.nextstats - now) - # Output progress info, and links to any relevant logs + def displayProgress(self): + if self.progress: + freeslots = self.threadcount - self.generator.activeJobCount() + if self.jobtotal != self.generator.completedJobCount(): + percent = "%0.2f" % (100 / self.jobtotal * self.generator.completedJobCount()) + else: + percent = "100" + loadavg = self.getLoad() + meminfo = self.getMemory() + available = int(meminfo["MemAvailable"]) / 1024 + + lines = [ "", + "%s: %5s%% | load: %s mem: %d MB | failed: %d idle: %d active: %d" % \ + (self.secs2hms(time.time() - self.build_start), percent, \ + loadavg[0], available, \ + self.generator.failedJobCount(), freeslots, self.generator.activeJobCount()), + "Building: %s" % ", ".join(self.generator.activeJobNames()) + ] + + columns = self.columns # in theory could change mid-loop + output = [] + for line in lines: + output.append(line if len(line) < columns else "%s+" % line[0:columns - 2]) + + if not self.progress_glitch_fix: + self.progress_glitch_fix = "%s\033[%dA" % ("\n" * len(output), len(output)) + + # \033[?7l: disable linewrap + # \033[0K: clear cursor to end of line (every line but last) + # \033[0J: clear cursor to end of screen (last line) + # \033%dA: move cursor up %d lines (move back to "home" position) + # \033[?7h: re-enable linewrap + # + # When the console is resized to a narrower width, lines wider than the + # new console width may be wrapped to a second line (depends on console + # software, for example PuTTY) so disable line wrapping to prevent this. + # + self.eprint("\033[?7l%s\033[0J\r\033[%dA\033[?7h" % ("\033[0K\n".join(output), len(output) - 1), + end="\r", isProgress=True) + self.progress_dirty = True + + def clearProgress(self): + if self.progress and self.progress_dirty: + self.progress_dirty = False + self.eprint("\033[0J", end="") + + # Output completion info, and links to any relevant logs def displayJobStatus(self, job): self.cseq += 1 - self.vprint(job["status"], job["task"], job["name"], p1=self.cseq, p2=self.jobtotal) + self.show_status(job["status"], job["task"], job["name"], p1=self.cseq, p2=self.jobtotal) if job["failed"]: if job["logfile"]: - print("\nThe following log for this failure is available:\n %s\n" % job["logfile"], \ - file=sys.stderr, flush=True) + self.eprint("\nThe following log for this failure is available:\n %s\n" % job["logfile"]) if job["failedjobs"] and job["failedjobs"][0]["logfile"]: if len(job["failedjobs"]) == 1: - print("The following log from the failed dependency may be relevant:", file=sys.stderr) + self.eprint("The following log from the failed dependency may be relevant:") else: - print("The following logs from the failed dependencies may be relevant:", file=sys.stderr) + self.eprint("The following logs from the failed dependencies may be relevant:") for fjob in job["failedjobs"]: - print(" %-7s %s => %s" % (fjob["task"], fjob["name"], fjob["logfile"]), file=sys.stderr) - print("", file=sys.stderr) - sys.stderr.flush() + self.eprint(" %-7s %s => %s" % (fjob["task"], fjob["name"], fjob["logfile"])) + self.eprint("") # If configured, send output for a job (either a logfile, or captured stdout) to stdout def processJobOutput(self, job): @@ -590,55 +671,52 @@ class Builder: if job["logfile"]: if self.log_combine == "always" or (job["failed"] and self.log_combine == "fail"): if self.bookends: - print("<<< %s seq %s <<<" % (job["name"], job["seq"])) + self.oprint("<<< %s seq %s <<<" % (job["name"], job["seq"])) try: with open(job["logfile"], "r", encoding="utf-8", errors="replace") as logfile: for line in logfile: - print(line, end="") - if self.debug: - log_size += len(line) + self.oprint(line, end="") + log_size += len(line) except UnicodeDecodeError: - print("\nPKGBUILDER ERROR: UnicodeDecodeError while reading log file %s\n" % job["logfile"], file=sys.stderr, flush=True) + self.eprint("\nPKGBUILDER ERROR: UnicodeDecodeError while reading log file %s\n" % job["logfile"]) if job["failed"]: - print("\nThe following log for this failure is available:\n %s\n" % job["logfile"]) + self.oprint("\nThe following log for this failure is available:\n %s\n" % job["logfile"]) if self.bookends: - print(">>> %s seq %s >>>" % (job["name"], job["seq"])) + self.oprint(">>> %s seq %s >>>" % (job["name"], job["seq"])) - sys.stdout.flush() log_processed = True elif job["cmdproc"]: if self.log_combine == "always" or (job["failed"] and self.log_combine == "fail"): if self.bookends: - print("<<< %s" % job["name"]) + self.oprint("<<< %s" % job["name"]) for line in job["cmdproc"].stdout: - print(line, end="") - if self.debug: - log_size += len(line) + self.oprint(line, end="") + log_size += len(line) if "autoremove" in job: for line in job["autoremove"].stdout: - print(line, end="") - if self.debug: - log_size += len(line) + self.oprint(line, end="") + log_size += len(line) job["autoremove"] = None if self.bookends: - print(">>> %s" % job["name"]) + self.oprint(">>> %s" % job["name"]) - sys.stdout.flush() log_processed = True - log_elapsed = time.time() - log_start + if log_processed: + self.flush() - if self.debug and log_processed: - log_rate = int(log_size / log_elapsed) if log_elapsed != 0 else 0 - log_data = ", %s" % "/".join(job["logfile"].split("/")[-2:]) if job["logfile"] else "" - DEBUG("WRITING LOG : {0:,} bytes in {1:0.3f} seconds ({2:,d} bytes/sec{3:})".format(log_size, log_elapsed, log_rate, log_data)) + if self.debug: + log_elapsed = time.time() - log_start + log_rate = int(log_size / log_elapsed) if log_elapsed != 0 else 0 + log_data = ", %s" % "/".join(job["logfile"].split("/")[-2:]) if job["logfile"] else "" + DEBUG("WRITING LOG : {0:,} bytes in {1:0.3f} seconds ({2:,d} bytes/sec{3:})".format(log_size, log_elapsed, log_rate, log_data)) # Log completion stats for job def writeJobLog(self, job): @@ -673,25 +751,76 @@ class Builder: self.generator.removed(pkg_name) - def startProcesses(self): - for process in self.processes: - process.start() + def show_status(self, status, task, data, p1=None, p2=None): + p1 = (self.threadcount - self.generator.activeJobCount()) if p1 == None else p1 + p2 = self.generator.activeJobCount() if p2 == None else p2 + + colored_status = "%s%-4s\033[0m" % (self.color_code[status], status) if self.colors else "%-4s" % status + + self.eprint("%s[%0*d/%0*d] [%s] %-7s %s" % \ + (self.progress_glitch_fix, self.twidth, p1, self.twidth, p2, colored_status, task, data)) def stopProcesses(self): - for process in self.processes: - process.stop() + if self.processes: + for process in self.processes: + process.stop() + self.processes = None - def vprint(self, status, task, data, p1=None, p2=None): - p1 = (self.threadcount - self.generator.activeJobCount()) if p1 == None else p1 - p2 = self.generator.activeJobCount() if p2 == None else p2 - print("[%0*d/%0*d] [%4s] %-7s %s" % - (self.twidth, p1, self.twidth, p2, - self.colorise(status), task, data), file=sys.stderr, flush=True) + def cleanup(self): + self.clearProgress() + self.flush() + if self.original_resize_handler != None: + signal.signal(signal.SIGWINCH, self.original_resize_handler) + self.stopProcesses() - def colorise(self, item): - if self.colors: - return "%s%-4s\033[0m" % (self.color_code[item], item) - return item + def flush(self): + if self.stdout_dirty: + sys.stdout.flush() + self.stdout_dirty = False + + if self.stderr_dirty: + sys.stderr.flush() + self.stderr_dirty = False + + def oprint(self, *args, flush=False, **kwargs): + if self.progress_dirty: + self.clearProgress() + + if self.stderr_dirty: + sys.stderr.flush() + self.stderr_dirty = False + + print(*args, **kwargs, file=sys.stdout, flush=flush) + self.stdout_dirty = not flush + + def eprint(self, *args, flush=False, isProgress=False, **kwargs): + if self.stdout_dirty: + sys.stdout.flush() + self.stdout_dirty = False + + if not isProgress and self.progress_dirty: + self.clearProgress() + + print(*args, **kwargs, file=sys.stderr, flush=flush) + self.stderr_dirty = not flush + + def getLoad(self): + return open("/proc/loadavg", "r").readline().split() + + def getMemory(self): + return dict((i.split()[0].rstrip(':'),int(i.split()[1])) for i in open("/proc/meminfo", "r").readlines()) + + def getTerminalSize(self, signum = None, frame = None): + h, w, hp, wp = struct.unpack('HHHH', + fcntl.ioctl(sys.stderr.fileno(), termios.TIOCGWINSZ, + struct.pack('HHHH', 0, 0, 0, 0))) + self.rows = h + self.columns = w + + def secs2hms(self, seconds): + min, sec = divmod(seconds, 60) + hour, min = divmod(min, 60) + return "%02d:%02d:%02d" % (hour, min, sec) def DEBUG(msg): if DEBUG_LOG: @@ -749,6 +878,9 @@ group.add_argument("--fail-after-active", action="store_false", dest="fail_immed parser.add_argument("--auto-remove", action="store_true", default=False, \ help="Automatically remove redundant source code directories. Default is disabled.") +parser.add_argument("--progress", action="store_true", default=False, \ + help="Display progress information. Default is disabled") + parser.add_argument("--verbose", action="store_true", default=False, \ help="Output verbose information to stderr.") @@ -777,17 +909,22 @@ with open("%s/parallel.pid" % THREAD_CONTROL, "w") as pid: print("%d" % os.getpid(), file=pid) try: - result = Builder(args.max_procs, args.plan, args.joblog, args.loadstats, args.stats_interval, \ - haltonerror=args.halt_on_error, failimmediately=args.fail_immediately, \ - log_burst=args.log_burst, log_combine=args.log_combine, bookends=args.with_bookends, \ - autoremove=args.auto_remove, colors=args.colors, \ - debug=args.debug, verbose=args.verbose).build() + builder = Builder(args.max_procs, args.plan, args.joblog, args.loadstats, args.stats_interval, \ + haltonerror=args.halt_on_error, failimmediately=args.fail_immediately, \ + log_burst=args.log_burst, log_combine=args.log_combine, bookends=args.with_bookends, \ + autoremove=args.auto_remove, colors=args.colors, progress=args.progress, \ + debug=args.debug, verbose=args.verbose) + + result = builder.build() if DEBUG_LOG: DEBUG_LOG.close() sys.exit(0 if result else 1) except (KeyboardInterrupt, SystemExit) as e: + if builder: + builder.cleanup() + if type(e) == SystemExit: sys.exit(int(str(e))) else: