diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 497f35a69..e00f067c5 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2387,6 +2387,11 @@ def all_runs(self): def latest_run(self): return self.pipelineruns.first() if self.pipelineruns.exists() else None + @property + def latest_successful_run(self): + successful_runs = self.pipelineruns.filter(run_end_date__isnull=False, run_exitcode=0) + return successful_runs.first() if successful_runs.exists() else None + @property def earliest_run(self): return self.pipelineruns.earliest("run_start_date") if self.pipelineruns.exists() else None diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index 6c2be3fac..a78c13551 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -9,24 +9,23 @@ import logging +from collections import Counter +from contextlib import suppress from io import StringIO from traceback import format_exc as traceback_format_exc import django_rq +from redis.exceptions import ConnectionError +from rq import Worker from vulnerabilities import models from vulnerabilities.importer import Importer from vulnerabilities.improver import Improver +from vulnerablecode.settings import RQ_QUEUES logger = logging.getLogger(__name__) -default_queue = django_rq.get_queue("default") -high_queue = django_rq.get_queue("high") - -queues = { - "default": django_rq.get_queue("default"), - "high": django_rq.get_queue("high"), -} +queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()} def execute_pipeline(pipeline_id, run_id): @@ -151,3 +150,61 @@ def dequeue_job(job_id): for queue in queues.values(): if job_id in queue.jobs: queue.remove(job_id) + + +def compute_queue_load_factor(): + """ + Compute worker load per queue. + + Load factor is the ratio of the total compute required to run all active pipelines + in a queue to the available worker capacity for that queue over a 24-hour period. + A value greater than 1 indicates that the number of workers is insufficient to + run all pipelines within the schedule. + + Also compute the additional workers needed to balance each queue + """ + field = models.PipelineSchedule._meta.get_field("run_priority") + label_to_value = {label: value for value, label in field.choices} + total_compute_seconds_per_queue = {} + worker_per_queue = {} + load_per_queue = {} + seconds_in_24_hr = 86400 + + with suppress(ConnectionError): + redis_conn = django_rq.get_connection() + queue_names = [ + w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names() + ] + worker_per_queue = dict(Counter(queue_names)) + + for queue in RQ_QUEUES.keys(): + total_compute_seconds_per_queue[queue] = sum( + (p.latest_successful_run.runtime / (p.run_interval / 24)) + for p in models.PipelineSchedule.objects.filter( + is_active=True, run_priority=label_to_value[queue] + ) + if p.latest_successful_run + ) + if queue not in worker_per_queue: + worker_per_queue[queue] = 0 + + for queue_name, worker_count in worker_per_queue.items(): + net_load_on_queue = "no_worker" + total_compute = total_compute_seconds_per_queue.get(queue_name, 0) + if total_compute == 0: + continue + + unit_load_on_queue = total_compute / seconds_in_24_hr + + num_of_worker_for_balanced_queue = round(unit_load_on_queue) + addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0) + + if worker_count > 0: + net_load_on_queue = unit_load_on_queue / worker_count + + load_per_queue[queue_name] = { + "load_factor": net_load_on_queue, + "additional_worker": addition_worker_needed, + } + + return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True)) diff --git a/vulnerabilities/templates/pipeline_dashboard.html b/vulnerabilities/templates/pipeline_dashboard.html index fc474efe7..826c614ea 100644 --- a/vulnerabilities/templates/pipeline_dashboard.html +++ b/vulnerabilities/templates/pipeline_dashboard.html @@ -1,5 +1,7 @@ {% extends "base.html" %} +{% load utils %} + {% block title %} Pipeline Dashboard {% endblock %} @@ -22,6 +24,18 @@ .column { word-break: break-word; } + + .has-text-orange { + color: #ff8c42 !important; + } + + .has-tooltip-orange::before { + background-color: #ff8c42 !important; + } + + .has-tooltip-orange::after { + border-top-color: #ff8c42 !important; + } {% endblock %} @@ -48,11 +62,63 @@
- {{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }}, - {{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }} -
++ + Load Factor: + + {% for queue_name, values in load_per_queue.items %} + + + {{ queue_name| capfirst }} + + {% with load_factor=values|get_item:"load_factor" additional=values|get_item:"additional_worker" %} + {% if load_factor == "no_worker" %} + + + + {% elif load_factor < 1 %} + + {{ load_factor|floatformat:2 }} + + + {% elif load_factor < 1.6 %} + + {{ load_factor|floatformat:2 }} + + + {% else %} + + {{ load_factor|floatformat:2 }} + + + {% endif %} + {% endwith %} + + {% if not forloop.last %} • {% endif %} + + {% endfor %} +
+ {% endif %} ++ {{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }}, + {{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }} +
+