From 5962ce66abb90cee3f1df7455aabd3e5306afe0e Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Fri, 10 Apr 2026 19:54:05 +0530 Subject: [PATCH 1/4] feat: add function to compute queue load factor Signed-off-by: Keshav Priyadarshi --- vulnerabilities/models.py | 5 +++ vulnerabilities/tasks.py | 67 +++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 497f35a69..e00f067c5 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2387,6 +2387,11 @@ def all_runs(self): def latest_run(self): return self.pipelineruns.first() if self.pipelineruns.exists() else None + @property + def latest_successful_run(self): + successful_runs = self.pipelineruns.filter(run_end_date__isnull=False, run_exitcode=0) + return successful_runs.first() if successful_runs.exists() else None + @property def earliest_run(self): return self.pipelineruns.earliest("run_start_date") if self.pipelineruns.exists() else None diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index 6c2be3fac..2454e041b 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -9,24 +9,23 @@ import logging +from collections import Counter +from contextlib import suppress from io import StringIO from traceback import format_exc as traceback_format_exc import django_rq +from redis.exceptions import ConnectionError +from rq import Worker from vulnerabilities import models from vulnerabilities.importer import Importer from vulnerabilities.improver import Improver +from vulnerablecode.settings import RQ_QUEUES logger = logging.getLogger(__name__) -default_queue = django_rq.get_queue("default") -high_queue = django_rq.get_queue("high") - -queues = { - "default": django_rq.get_queue("default"), - "high": django_rq.get_queue("high"), -} +queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()} def execute_pipeline(pipeline_id, run_id): @@ -151,3 +150,57 @@ def dequeue_job(job_id): for queue in queues.values(): if job_id in queue.jobs: queue.remove(job_id) + + +def compute_queue_load_factor(): + """ + Compute worker load per queue. + + Load factor is the ratio of the total compute required to run all active pipelines + in a queue to the available worker capacity for that queue over a 24-hour period. + A value greater than 1 indicates that the number of workers is insufficient to + run all pipelines within the schedule. + + Also compute the additional workers needed to balance each queue + """ + field = models.PipelineSchedule._meta.get_field("run_priority") + label_to_value = {label: value for value, label in field.choices} + total_compute_seconds_per_queue = {} + worker_per_queue = {} + load_per_queue = {} + seconds_in_24_hr = 86400 + + for queue in RQ_QUEUES.keys(): + total_compute_seconds_per_queue[queue] = sum( + (p.latest_successful_run.runtime / (p.run_interval / 24)) + for p in models.PipelineSchedule.objects.filter( + is_active=True, run_priority=label_to_value[queue] + ) + if p.latest_successful_run + ) + + with suppress(ConnectionError): + redis_conn = django_rq.get_connection() + queue_names = [ + w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names() + ] + worker_per_queue = dict(Counter(queue_names)) + + for queue_name, worker_count in worker_per_queue.items(): + total_compute = total_compute_seconds_per_queue.get(queue_name, 0) + if worker_count == 0 or total_compute == 0: + continue + + unit_load_on_queue = total_compute / seconds_in_24_hr + + num_of_worker_for_balanced_queue = round(unit_load_on_queue) + addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0) + + net_load_on_queue = unit_load_on_queue / worker_count + + load_per_queue[queue_name] = { + "load_factor": net_load_on_queue, + "additional_worker": addition_worker_needed, + } + + return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True)) From 4240d36a21c2301a5719f7f64916f9b1141457c2 Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Fri, 10 Apr 2026 20:42:42 +0530 Subject: [PATCH 2/4] fix: track queues with no workers in load factor Signed-off-by: Keshav Priyadarshi --- vulnerabilities/tasks.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index 2454e041b..a78c13551 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -170,6 +170,13 @@ def compute_queue_load_factor(): load_per_queue = {} seconds_in_24_hr = 86400 + with suppress(ConnectionError): + redis_conn = django_rq.get_connection() + queue_names = [ + w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names() + ] + worker_per_queue = dict(Counter(queue_names)) + for queue in RQ_QUEUES.keys(): total_compute_seconds_per_queue[queue] = sum( (p.latest_successful_run.runtime / (p.run_interval / 24)) @@ -178,17 +185,13 @@ def compute_queue_load_factor(): ) if p.latest_successful_run ) - - with suppress(ConnectionError): - redis_conn = django_rq.get_connection() - queue_names = [ - w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names() - ] - worker_per_queue = dict(Counter(queue_names)) + if queue not in worker_per_queue: + worker_per_queue[queue] = 0 for queue_name, worker_count in worker_per_queue.items(): + net_load_on_queue = "no_worker" total_compute = total_compute_seconds_per_queue.get(queue_name, 0) - if worker_count == 0 or total_compute == 0: + if total_compute == 0: continue unit_load_on_queue = total_compute / seconds_in_24_hr @@ -196,7 +199,8 @@ def compute_queue_load_factor(): num_of_worker_for_balanced_queue = round(unit_load_on_queue) addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0) - net_load_on_queue = unit_load_on_queue / worker_count + if worker_count > 0: + net_load_on_queue = unit_load_on_queue / worker_count load_per_queue[queue_name] = { "load_factor": net_load_on_queue, From 6742ba61154ab5182d4a0eeb22c02129b3fe531e Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Fri, 10 Apr 2026 20:44:11 +0530 Subject: [PATCH 3/4] feat: cache computed load factor for 5 minutes Signed-off-by: Keshav Priyadarshi --- vulnerabilities/views.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index b984fbb51..5b9406f87 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -15,6 +15,7 @@ from cvss.exceptions import CVSS4MalformedError from django.contrib import messages from django.contrib.auth.views import LoginView +from django.core.cache import cache from django.core.exceptions import ValidationError from django.core.mail import send_mail from django.db.models import Exists @@ -48,6 +49,7 @@ from vulnerabilities.pipelines.v2_importers.epss_importer_v2 import EPSSImporterPipeline from vulnerabilities.severity_systems import EPSS from vulnerabilities.severity_systems import SCORING_SYSTEMS +from vulnerabilities.tasks import compute_queue_load_factor from vulnerabilities.throttling import AnonUserUIThrottle from vulnerabilities.utils import TYPES_WITH_MULTIPLE_IMPORTERS from vulnerabilities.utils import get_advisories_from_groups @@ -57,6 +59,8 @@ PAGE_SIZE = 10 +CACHE_TIMEOUT = 60 * 5 + class VulnerableCodeView(View): """ @@ -961,6 +965,13 @@ def get_queryset(self): def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) + load_per_queue = cache.get("load_per_queue") + + if load_per_queue is None: + load_per_queue = compute_queue_load_factor() + cache.set("load_per_queue", load_per_queue, CACHE_TIMEOUT) + + context["load_per_queue"] = load_per_queue context["active_pipeline_count"] = PipelineSchedule.objects.filter(is_active=True).count() context["disabled_pipeline_count"] = PipelineSchedule.objects.filter( is_active=False From 065d5a738114dc88a23f049dbcfb2f04334ddd57 Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Fri, 10 Apr 2026 20:45:18 +0530 Subject: [PATCH 4/4] feat: show load factor on pipeline dashboard Signed-off-by: Keshav Priyadarshi --- .../templates/pipeline_dashboard.html | 76 +++++++++++++++++-- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/vulnerabilities/templates/pipeline_dashboard.html b/vulnerabilities/templates/pipeline_dashboard.html index fc474efe7..826c614ea 100644 --- a/vulnerabilities/templates/pipeline_dashboard.html +++ b/vulnerabilities/templates/pipeline_dashboard.html @@ -1,5 +1,7 @@ {% extends "base.html" %} +{% load utils %} + {% block title %} Pipeline Dashboard {% endblock %} @@ -22,6 +24,18 @@ .column { word-break: break-word; } + + .has-text-orange { + color: #ff8c42 !important; + } + + .has-tooltip-orange::before { + background-color: #ff8c42 !important; + } + + .has-tooltip-orange::after { + border-top-color: #ff8c42 !important; + } {% endblock %} @@ -48,11 +62,63 @@

Pipeline Dashboard

-
-

- {{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }}, - {{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }} -

+
+
+ {% if load_per_queue %} +

+ + Load Factor: + + {% for queue_name, values in load_per_queue.items %} + + + {{ queue_name| capfirst }} + + {% with load_factor=values|get_item:"load_factor" additional=values|get_item:"additional_worker" %} + {% if load_factor == "no_worker" %} + + + + {% elif load_factor < 1 %} + + {{ load_factor|floatformat:2 }} + + + {% elif load_factor < 1.6 %} + + {{ load_factor|floatformat:2 }} + + + {% else %} + + {{ load_factor|floatformat:2 }} + + + {% endif %} + {% endwith %} + + {% if not forloop.last %} • {% endif %} + + {% endfor %} +

+ {% endif %} +
+
+

+ {{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }}, + {{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }} +

+