Blob Blame History Raw
From f3ea708082fac95684ca5f925bb356ced1c1705e Mon Sep 17 00:00:00 2001
From: Marius Kittler <mkittler@suse.de>
Date: Wed, 15 Jan 2020 18:59:18 +0100
Subject: [PATCH 1/4] Avoid race condition in job scheduling

* Associate workers and jobs before sending the job to the worker
* Consider all unfinished jobs in stale job handling on worker
  status updates via web sockets

This should help to avoid the problems mentioned in the ticket
https://progress.opensuse.org/issues/62015.
---
 lib/OpenQA/Resource/Jobs.pm                | 25 +++----
 lib/OpenQA/Scheduler/Model/Jobs.pm         | 80 ++++++++++------------
 lib/OpenQA/Schema/Result/Jobs.pm           | 35 +++-------
 lib/OpenQA/Schema/Result/Workers.pm        |  5 ++
 lib/OpenQA/WebSockets/Controller/Worker.pm | 44 +++++++-----
 t/04-scheduler.t                           |  5 +-
 6 files changed, 90 insertions(+), 104 deletions(-)

diff --git a/lib/OpenQA/Resource/Jobs.pm b/lib/OpenQA/Resource/Jobs.pm
index 4fcad94924..94096bd983 100644
--- a/lib/OpenQA/Resource/Jobs.pm
+++ b/lib/OpenQA/Resource/Jobs.pm
@@ -42,41 +42,32 @@ or done. Scheduled jobs can't be restarted.
 sub job_restart {
     my ($jobids) = @_ or die "missing name parameter\n";
 
-    # first, duplicate all jobs that are either running or done
+    # duplicate all jobs that are either running or done
     my $schema = OpenQA::Schema->singleton;
     my $jobs   = $schema->resultset("Jobs")->search(
         {
             id    => $jobids,
             state => [OpenQA::Jobs::Constants::EXECUTION_STATES, OpenQA::Jobs::Constants::FINAL_STATES],
         });
-
     my @duplicated;
     while (my $j = $jobs->next) {
         my $dup = $j->auto_duplicate;
         push @duplicated, $dup->{cluster_cloned} if $dup;
     }
 
-    # then tell workers to abort
-    $jobs = $schema->resultset("Jobs")->search(
+    # abort running jobs
+    my $running_jobs = $schema->resultset("Jobs")->search(
         {
             id    => $jobids,
             state => [OpenQA::Jobs::Constants::EXECUTION_STATES],
         });
-
-    $jobs->search(
-        {
-            result => OpenQA::Jobs::Constants::NONE,
-        }
-    )->update(
-        {
-            result => OpenQA::Jobs::Constants::USER_RESTARTED,
-        });
-
-    while (my $j = $jobs->next) {
+    $running_jobs->search({result => OpenQA::Jobs::Constants::NONE})
+      ->update({result => OpenQA::Jobs::Constants::USER_RESTARTED});
+    while (my $j = $running_jobs->next) {
         $j->calculate_blocked_by;
-        log_debug("enqueuing abort for " . $j->id . " " . $j->worker_id);
-        $j->worker->send_command(command => 'abort', job_id => $j->id);
+        $j->abort;
     }
+
     return @duplicated;
 }
 
diff --git a/lib/OpenQA/Scheduler/Model/Jobs.pm b/lib/OpenQA/Scheduler/Model/Jobs.pm
index 100450a409..1edc3f0880 100644
--- a/lib/OpenQA/Scheduler/Model/Jobs.pm
+++ b/lib/OpenQA/Scheduler/Model/Jobs.pm
@@ -199,13 +199,33 @@ sub schedule {
             next;
         }
 
+        # assign the jobs to the worker and then send the jobs to the worker
+        # note: The $worker->update(...) is also done when the worker sends a status update. That is
+        #       required to track the worker's current job when assigning multiple jobs to it. We still
+        #       need to set it here immediately to be sure the scheduler does not consider the worker
+        #       free anymore.
         my $res;
         try {
             if ($actual_job_count > 1) {
+                my %worker_assignment = (
+                    state              => ASSIGNED,
+                    t_started          => undef,
+                    assigned_worker_id => $worker_id,
+                );
+                $schema->txn_do(
+                    sub {
+                        $_->update(\%worker_assignment) for @jobs;
+                        $worker->set_current_job($jobs[0]);
+                    });
                 $res
                   = $self->_assign_multiple_jobs_to_worker(\@jobs, $worker, $directly_chained_job_sequence, $job_ids);
             }
             else {
+                $schema->txn_do(
+                    sub {
+                        $jobs[0]->set_assigned_worker($worker);
+                        $worker->set_current_job($jobs[0]);
+                    });
                 $res = $jobs[0]->ws_send($worker);
             }
             die "Failed contacting websocket server over HTTP" unless ref($res) eq "HASH" && exists $res->{state};
@@ -214,56 +234,30 @@ sub schedule {
             log_debug("Failed to send data to websocket, reason: $_");
         };
 
-        if (ref($res) eq "HASH" && $res->{state}->{msg_sent} == 1) {
+        if (ref($res) eq 'HASH' && $res->{state} && $res->{state}->{msg_sent} == 1) {
             log_debug("Sent job(s) '$job_ids_str' to worker '$worker_id'");
+            push(@successfully_allocated, map { {job => $_, worker => $worker_id} } @$job_ids);
+            next;
+        }
 
-            # associate the worker to the job, so the worker can send updates
-            try {
-                if ($actual_job_count > 1) {
-                    my %worker_assignment = (
-                        state              => ASSIGNED,
-                        t_started          => undef,
-                        assigned_worker_id => $worker_id,
-                    );
-                    $_->update(\%worker_assignment) for @jobs;
-                    $worker->update({job_id => $first_job_id});
-                    # note: The job_id column of the workers table is updated as soon as the worker progresses
-                    #       to the next job so the actually current job and current module can be displayed.
-                }
-                else {
-                    if ($jobs[0]->set_assigned_worker($worker)) {
-                        push(@successfully_allocated, {job => $first_job_id, worker => $worker_id});
-                    }
-                    else {
-                        # Send abort and reschedule if we fail associating the job to the worker
-                        $jobs[0]->reschedule_rollback($worker);
-                    }
-                }
-            }
-            catch {
-                log_debug("Failed to set worker in scheduling state, reason: $_");
-            };
+        # reset worker and jobs on failure
+        log_debug("Failed sending job(s) '$job_ids_str' to worker '$worker_id'");
+        try {
+            $schema->txn_do(sub { $worker->unprepare_for_work; });
         }
-        else {
-            # reset worker and jobs on failure
-            log_debug("Failed sending job(s) '$job_ids_str' to worker '$worker_id'");
+        catch {
+            log_debug("Failed resetting unprepare worker, reason: $_");
+        };
+        for my $job (@jobs) {
             try {
-                $worker->unprepare_for_work;
+                # remove the associated worker and be sure to be in scheduled state.
+                $schema->txn_do(sub { $job->reschedule_state; });
             }
             catch {
-                log_debug("Failed resetting unprepare worker, reason: $_");
+                # if we see this, we are in a really bad state
+                my $job_id = $job->id;
+                log_debug("Failed resetting job '$job_id' to scheduled state, reason: $_");
             };
-            for my $job (@jobs) {
-                try {
-                    # Remove the associated worker and be sure to be in scheduled state.
-                    $job->reschedule_state;
-                }
-                catch {
-                    # Again: If we see this, we are in a really bad state.
-                    my $job_id = $job->id;
-                    log_debug("Failed resetting job '$job_id' to scheduled state, reason: $_");
-                };
-            }
         }
     }
 
diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm
index 23c2de6317..1302743642 100644
--- a/lib/OpenQA/Schema/Result/Jobs.pm
+++ b/lib/OpenQA/Schema/Result/Jobs.pm
@@ -343,34 +343,18 @@ sub reschedule_state {
     }
 }
 
-sub reschedule_rollback {
-    my ($self, $worker) = @_;
-    $self->scheduler_abort($worker)
-      ;    # TODO: This might become a problem if we have duplicated job IDs from 2 or more web UIs.
-           #       Workers should be able to kill a job checking the (job token + job id) instead.
-    $self->reschedule_state;
-}
-
 sub set_assigned_worker {
     my ($self, $worker) = @_;
-    return 0 unless $worker;
 
+    my $job_id    = $self->id;
+    my $worker_id = $worker->id;
     $self->update(
         {
             state              => ASSIGNED,
             t_started          => undef,
-            assigned_worker_id => $worker->id,
+            assigned_worker_id => $worker_id,
         });
-
-    $worker->update({job_id => $self->id});
-
-    if ($worker->job->id eq $self->id) {
-        log_debug("Job '" . $self->id . "' has worker '" . $worker->id . "' assigned");
-        return 1;
-    }
-    else {
-        return 0;
-    }
+    log_debug("Job '$job_id' has worker '$worker_id' assigned");
 }
 
 sub prepare_for_work {
@@ -925,10 +909,13 @@ sub _cluster_cloned {
 }
 
 sub abort {
-    my $self = shift;
-    return unless $self->worker;
-    log_debug("[Job#" . $self->id . "] Sending abort command");
-    $self->worker->send_command(command => 'abort', job_id => $self->id);
+    my $self   = shift;
+    my $worker = $self->worker;
+    return undef unless $worker;
+
+    my ($job_id, $worker_id) = ($self->id, $worker->id);
+    log_debug("Sending abort command to worker $worker_id for job $job_id");
+    $worker->send_command(command => 'abort', job_id => $job_id);
 }
 
 sub scheduler_abort {
diff --git a/lib/OpenQA/Schema/Result/Workers.pm b/lib/OpenQA/Schema/Result/Workers.pm
index 929b21e751..a66f23b11b 100644
--- a/lib/OpenQA/Schema/Result/Workers.pm
+++ b/lib/OpenQA/Schema/Result/Workers.pm
@@ -272,4 +272,9 @@ sub unfinished_jobs {
     return $self->previous_jobs->search({t_finished => undef});
 }
 
+sub set_current_job {
+    my ($self, $job) = @_;
+    $self->update({job_id => $job->id});
+}
+
 1;
diff --git a/lib/OpenQA/WebSockets/Controller/Worker.pm b/lib/OpenQA/WebSockets/Controller/Worker.pm
index 5fe98ec880..10eff009ba 100644
--- a/lib/OpenQA/WebSockets/Controller/Worker.pm
+++ b/lib/OpenQA/WebSockets/Controller/Worker.pm
@@ -147,51 +147,59 @@ sub _message {
             log_debug("Could not send the population number to worker: $_");
         };
 
-# find the job currently associated with that worker and check whether the worker still executes the job it is supposed to
+        # find the job currently associated with that worker and check whether the worker still
+        # executes the job it is supposed to
         try {
             my $worker = $schema->resultset('Workers')->find($wid);
             return undef unless $worker;
 
-            my $registered_job_id;
+            my $current_job_id;
             my $registered_job_token;
-            my $registered_job_state;
-            my $registered_job = $worker->job;
-            if ($registered_job) {
-                $registered_job_id    = $registered_job->id;
-                $registered_job_state = $registered_job->state;
+            my $current_job_state;
+            my @unfinished_jobs = $worker->unfinished_jobs;
+            my $current_job     = $worker->job // $unfinished_jobs[0];
+            if ($current_job) {
+                $current_job_id    = $current_job->id;
+                $current_job_state = $current_job->state;
             }
 
             # log debugging info
-            log_debug("Found job $registered_job_id in DB from worker_status update sent by worker $wid")
-              if defined $registered_job_id;
+            log_debug("Found job $current_job_id in DB from worker_status update sent by worker $wid")
+              if defined $current_job_id;
             log_debug("Received request has job id: $job_id")
               if defined $job_id;
             $registered_job_token = $worker->get_property('JOBTOKEN');
-            log_debug("Worker $wid for job $registered_job_id has token $registered_job_token")
-              if defined $registered_job_id && defined $registered_job_token;
+            log_debug("Worker $wid for job $current_job_id has token $registered_job_token")
+              if defined $current_job_id && defined $registered_job_token;
             log_debug("Received request has token: $job_token")
               if defined $job_token;
 
-            # skip any further actions if worker just does the job we expected it to do
+            # skip any further actions if worker just does the one job we expected it to do
             return undef
               if ( defined $job_id
-                && defined $registered_job_id
+                && defined $current_job_id
                 && defined $job_token
                 && defined $registered_job_token
-                && $job_id eq $registered_job_id
+                && $job_id eq $current_job_id
                 && (my $job_token_correct = $job_token eq $registered_job_token)
-                && OpenQA::Jobs::Constants::meta_state($registered_job_state) eq OpenQA::Jobs::Constants::EXECUTION);
+                && OpenQA::Jobs::Constants::meta_state($current_job_state) eq OpenQA::Jobs::Constants::EXECUTION)
+              && (scalar @unfinished_jobs <= 1);
 
             # handle the case when the worker does not work on the job(s) it is supposed to work on
-            my @all_jobs_currently_associated_with_worker = ($registered_job, $worker->unfinished_jobs);
+            my @all_jobs_currently_associated_with_worker = ($current_job, @unfinished_jobs);
+            my %considered_jobs;
             for my $associated_job (@all_jobs_currently_associated_with_worker) {
                 next unless defined $associated_job;
 
+                # prevent doing this twice for the same job ($current_job and @unfinished_jobs might overlap)
+                my $job_id = $associated_job->id;
+                next if exists $considered_jobs{$job_id};
+                $considered_jobs{$job_id} = 1;
+
                 # do nothing if the job token is corrent and the worker claims that it is still working on that job
                 # or that the job is still pending
                 if ($job_token_correct) {
-                    my $job_id = $associated_job->id;
-                    next if $job_id eq $registered_job_id;
+                    next if $job_id eq $current_job_id;
                     next if exists $pending_job_ids->{$job_id};
                 }
 
diff --git a/t/04-scheduler.t b/t/04-scheduler.t
index b62ebfcd62..cf086fb2a9 100644
--- a/t/04-scheduler.t
+++ b/t/04-scheduler.t
@@ -332,8 +332,9 @@ is(scalar(@{$rjobs_before}) + 1,             scalar(@{$rjobs_after}), "number of
 is($rjobs_after->[-1]->{assigned_worker_id}, 1,                       'assigned worker set');
 
 $grabbed = job_get($job->id);
-is($grabbed->worker->id, $worker->{id}, 'correct worker assigned');
-is($grabbed->state,      ASSIGNED,      'job is in assigned state');
+is($grabbed->assigned_worker_id, $worker->{id}, 'worker assigned to job');
+is($grabbed->worker->id,         $worker->{id}, 'job assigned to worker');
+is($grabbed->state,              ASSIGNED,      'job is in assigned state');
 
 # register worker again with no job while the web UI thinks it has an assigned job
 is(register_worker, $id, 'worker re-registered');

From 187a5ed85fdc8ac350a8b0141643c8d3acb9e081 Mon Sep 17 00:00:00 2001
From: Marius Kittler <mkittler@suse.de>
Date: Thu, 16 Jan 2020 15:15:57 +0100
Subject: [PATCH 2/4] Simplify auto_duplicate code

---
 lib/OpenQA/Schema/Result/Jobs.pm | 41 ++++++++++++--------------------
 t/05-scheduler-full.t            |  6 ++++-
 2 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm
index 1302743642..c8cb835519 100644
--- a/lib/OpenQA/Schema/Result/Jobs.pm
+++ b/lib/OpenQA/Schema/Result/Jobs.pm
@@ -853,47 +853,35 @@ sub auto_duplicate {
     # set this clone was triggered by manually if it's not auto-clone
     $args->{dup_type_auto} //= 0;
 
+    my $job_id = $self->id;
     my $clones = $self->duplicate($args);
-
-    unless ($clones) {
-        log_debug('duplication failed');
-        return;
+    if (!$clones) {
+        log_debug("Duplication of job $job_id failed");
+        return undef;
     }
+
     # abort jobs in the old cluster (exclude the original $args->{jobid})
     my $rsource = $self->result_source;
     my $jobs    = $rsource->schema->resultset("Jobs")->search(
         {
-            id    => {'!=' => $self->id, '-in' => [keys %$clones]},
+            id    => {'!=' => $job_id, '-in' => [keys %$clones]},
             state => [PRE_EXECUTION_STATES, EXECUTION_STATES],
         });
 
-    $jobs->search(
-        {
-            result => NONE,
-        }
-    )->update(
-        {
-            result => PARALLEL_RESTARTED,
-        });
+    $jobs->search({result => NONE})->update({result => PARALLEL_RESTARTED});
 
     while (my $j = $jobs->next) {
-        if ($j->worker) {
-            log_debug("enqueuing abort for " . $j->id . " " . $j->worker_id);
-            $j->worker->send_command(command => 'abort', job_id => $j->id);
-        }
-        else {
-            if ($j->state eq SCHEDULED) {
-                $j->release_networks;
-                $j->update({state => CANCELLED});
-            }
-        }
+        next if $j->abort;
+        next unless $j->state eq SCHEDULED;
+        $j->release_networks;
+        $j->update({state => CANCELLED});
     }
 
-    log_debug('new job ' . $clones->{$self->id}->{clone});
+    log_debug('new job ' . $clones->{$job_id}->{clone});
 
     # Attach all clones mapping to new job object
     # TODO: better return a proper hash here
-    my $dup = $rsource->resultset->find($clones->{$self->id}->{clone});
+    my $dup = $rsource->resultset->find($clones->{$job_id}->{clone});
     $dup->_cluster_cloned($clones);
     return $dup;
 }
@@ -911,11 +899,12 @@ sub _cluster_cloned {
 sub abort {
     my $self   = shift;
     my $worker = $self->worker;
-    return undef unless $worker;
+    return 0 unless $worker;
 
     my ($job_id, $worker_id) = ($self->id, $worker->id);
     log_debug("Sending abort command to worker $worker_id for job $job_id");
     $worker->send_command(command => 'abort', job_id => $job_id);
+    return 1;
 }
 
 sub scheduler_abort {
diff --git a/t/05-scheduler-full.t b/t/05-scheduler-full.t
index 0a8d95ab92..59bad17e09 100644
--- a/t/05-scheduler-full.t
+++ b/t/05-scheduler-full.t
@@ -199,7 +199,11 @@ subtest 'Simulation of heavy unstable load' => sub {
     dead_workers($schema);
     my @duplicated;
 
-    push(@duplicated, $_->auto_duplicate()) for $schema->resultset("Jobs")->latest_jobs;
+    # duplicate latest jobs ignoring failures
+    for my $job ($schema->resultset('Jobs')->latest_jobs) {
+        my $duplicate = $job->auto_duplicate;
+        push(@duplicated, $duplicate) if defined $duplicate;
+    }
 
     push(@workers, unresponsive_worker($k->key, $k->secret, "http://localhost:$mojoport", $_)) for (1 .. 50);
     my $i = 4;

From 149e6423293562a03a471429b149895e72269ce2 Mon Sep 17 00:00:00 2001
From: Marius Kittler <mkittler@suse.de>
Date: Thu, 16 Jan 2020 15:18:46 +0100
Subject: [PATCH 3/4] Consider assigned jobs when duplicating jobs

When aborting jobs in the old cluster, treat assigned
jobs in the same way as scheduled jobs if the worker
hasn't accepted the job yet.
---
 lib/OpenQA/Schema/Result/Jobs.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm
index c8cb835519..06c0213f71 100644
--- a/lib/OpenQA/Schema/Result/Jobs.pm
+++ b/lib/OpenQA/Schema/Result/Jobs.pm
@@ -872,7 +872,7 @@ sub auto_duplicate {
 
     while (my $j = $jobs->next) {
         next if $j->abort;
-        next unless $j->state eq SCHEDULED;
+        next unless $j->state eq SCHEDULED || $j->state eq ASSIGNED;
         $j->release_networks;
         $j->update({state => CANCELLED});
     }

From 127a9c0e380d10dc80f2ea74d57e2aa01504d1a3 Mon Sep 17 00:00:00 2001
From: Marius Kittler <mkittler@suse.de>
Date: Thu, 16 Jan 2020 15:21:54 +0100
Subject: [PATCH 4/4] Improve coding style in scheduler tests

---
 t/05-scheduler-capabilities.t |  20 ++--
 t/05-scheduler-dependencies.t | 176 +++++++++++++++++-----------------
 t/05-scheduler-full.t         |  24 ++---
 3 files changed, 107 insertions(+), 113 deletions(-)

diff --git a/t/05-scheduler-capabilities.t b/t/05-scheduler-capabilities.t
index 636fe9a990..886da525f5 100644
--- a/t/05-scheduler-capabilities.t
+++ b/t/05-scheduler-capabilities.t
@@ -24,14 +24,14 @@ use lib "$FindBin::Bin/lib";
 use OpenQA::Scheduler::Model::Jobs;
 use OpenQA::Constants 'WEBSOCKET_API_VERSION';
 use OpenQA::Test::Database;
+use OpenQA::WebAPI::Controller::API::V1::Worker;
 use Test::Mojo;
 use Test::More;
 use Test::Warnings;
 use Mojo::Util 'monkey_patch';
 
-my $schema = OpenQA::Test::Database->new->create;    #(skip_fixtures => 1);
-
-my $sent = {};
+my $schema = OpenQA::Test::Database->new->create;
+my $sent   = {};
 
 OpenQA::Scheduler::Model::Jobs->singleton->shuffle_workers(0);
 
@@ -55,18 +55,13 @@ monkey_patch 'OpenQA::Schema::Result::Jobs', ws_send => sub {
     return {state => {msg_sent => 1}};
 };
 
-
-#my $t = Test::Mojo->new('OpenQA::WebAPI');
-
 sub list_jobs {
     my %args = @_;
     [map { $_->to_hash(assets => 1) } $schema->resultset('Jobs')->complex_query(%args)->all];
 }
 
-my $current_jobs = list_jobs();
-#diag explain $current_jobs;
-
-my %settings = (
+my $current_jobs = list_jobs;
+my %settings     = (
     DISTRI      => 'Unicorn',
     FLAVOR      => 'pink',
     VERSION     => '42',
@@ -174,9 +169,7 @@ $jobH->set_prio(8);
 $jobI->set_prio(10);
 $jobJ->set_prio(9);
 
-use OpenQA::WebAPI::Controller::API::V1::Worker;
-my $c = OpenQA::WebAPI::Controller::API::V1::Worker->new;
-
+my $c     = OpenQA::WebAPI::Controller::API::V1::Worker->new;
 my $w1_id = $c->_register($schema, "host", "1", \%workercaps64_client);
 my $w2_id = $c->_register($schema, "host", "2", \%workercaps64_server);
 my $w3_id = $c->_register($schema, "host", "3", \%workercaps32);
@@ -220,7 +213,6 @@ is($job->{id}, $jobJ->id,
 $job = $sent->{$w9_id}->{job}->to_hash;
 is($job->{id}, $jobI->id, "this worker can do jobI, child - client");
 
-
 # job G is not grabbed because there is no worker with class 'special'
 
 done_testing();
diff --git a/t/05-scheduler-dependencies.t b/t/05-scheduler-dependencies.t
index 8437438f29..1b74aeb6f7 100644
--- a/t/05-scheduler-dependencies.t
+++ b/t/05-scheduler-dependencies.t
@@ -1,6 +1,6 @@
 #!/usr/bin/env perl -w
 
-# Copyright (C) 2014-2019 SUSE LLC
+# Copyright (C) 2014-2020 SUSE LLC
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -169,7 +169,7 @@ OpenQA::Scheduler::Model::Jobs->singleton->shuffle_workers(0);
 sub schedule {
     my $id = OpenQA::Scheduler::Model::Jobs->singleton->schedule();
     for my $i (@$id) {
-        _jobs_update_state([$schema->resultset('Jobs')->find($i->{job})], OpenQA::Jobs::Constants::RUNNING);
+        _jobs_update_state([$schema->resultset('Jobs')->find($i->{job})], RUNNING);
     }
 }
 
@@ -180,7 +180,7 @@ $jobs_result_mock->mock(
     ws_send => sub {
         my ($self, $worker) = @_;
         my $hashref = $self->prepare_for_work($worker);
-        _jobs_update_state([$self], OpenQA::Jobs::Constants::RUNNING);
+        _jobs_update_state([$self], RUNNING);
 
         $hashref->{assigned_worker_id} = $worker->id;
         $sent->{$worker->id} = {worker => $worker, job => $self, jobhash => $hashref};
@@ -427,35 +427,37 @@ $result = $jobE->done(result => 'incomplete');
 is($result, 'incomplete', 'job_set_done on E');
 
 my $job = job_get_deps($jobA->id);
-is($job->{state},  "done",   "job_set_done changed state");
-is($job->{result}, "failed", "job_set_done changed result");
+is($job->{state},  DONE,   'job_set_done changed state');
+is($job->{result}, FAILED, 'job_set_done changed result');
 
 $job = job_get_deps($jobB->id);
-is($job->{state}, "running", "job_set_done changed state");
+is($job->{state}, RUNNING, 'job_set_done changed state');
 
 $job = job_get_deps($jobC->id);
-is($job->{state}, "running", "job_set_done changed state");
+is($job->{state}, RUNNING, 'job_set_done changed state');
 
 $job = job_get_deps($jobD->id);
-is($job->{state},  "done",            "job_set_done changed state");
-is($job->{result}, "parallel_failed", "job_set_done changed result, jobD failed because of jobA");
+is($job->{state},  DONE,            'job_set_done changed state');
+is($job->{result}, PARALLEL_FAILED, 'job_set_done changed result, jobD failed because of jobA');
 
 $job = job_get_deps($jobE->id);
-is($job->{state},  "done",            "job_set_done changed state");
-is($job->{result}, "parallel_failed", "job_set_done changed result, jobE failed because of jobD");
+is($job->{state},  DONE,            'job_set_done changed state');
+is($job->{result}, PARALLEL_FAILED, 'job_set_done changed result, jobE failed because of jobD');
 
 $jobF->discard_changes;
 $job = job_get_deps($jobF->id);
-is($job->{state}, "running", "job_set_done changed state");
+is($job->{state}, RUNNING, 'job_set_done changed state');
 
 # check MM API for children status - available only for running jobs
 my $worker = $schema->resultset("Workers")->find($worker_ids[1]);
 my $t      = Test::Mojo->new('OpenQA::WebAPI');
 
+my $job_token = $sent->{job}->{$jobC->id}->{worker}->get_property('JOBTOKEN');
+isnt($job_token, undef, 'JOBTOKEN is present');
 $t->ua->on(
     start => sub {
         my ($ua, $tx) = @_;
-        $tx->req->headers->add('X-API-JobToken' => $sent->{job}->{$jobC->id}->{worker}->get_property('JOBTOKEN'));
+        $tx->req->headers->add('X-API-JobToken' => $job_token);
     });
 $t->get_ok('/api/v1/mm/children/running')->status_is(200)->json_is('/jobs' => [$jobF->id])
   ->or(sub { diag explain $t->tx->res->content });
@@ -469,47 +471,47 @@ my $id = $jobF->auto_duplicate;
 ok(defined $id, "duplicate works");
 
 $job = job_get_deps($jobA->id);    # cloned
-is($job->{state},  "done",   "no change");
-is($job->{result}, "failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,   'no change');
+is($job->{result}, FAILED, 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 
 $job = job_get_deps($jobB->id);    # cloned
 is($job->{result}, "parallel_failed", "$job->{id} B stopped");
-ok(defined $job->{clone_id}, "cloned");
+ok(defined $job->{clone_id}, 'cloned');
 my $jobB2 = $job->{clone_id};
 
 $job = job_get_deps($jobC->id);    # cloned
-is($job->{state}, "running", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state}, RUNNING, 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 my $jobC2 = $job->{clone_id};
 
 $job = job_get_deps($jobD->id);    # cloned
-is($job->{state},  "done",            "no change");
-is($job->{result}, "parallel_failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,              'no change');
+is($job->{result}, "parallel_failed", 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 
 $job = job_get_deps($jobE->id);    # cloned
-is($job->{state},  "done",            "no change");
-is($job->{result}, "parallel_failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,              'no change');
+is($job->{result}, "parallel_failed", 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 
 $job = job_get_deps($jobF->id);    # cloned
-is($job->{state}, "running", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state}, RUNNING, 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 my $jobF2 = $job->{clone_id};
 
 $job = job_get_deps($jobB2);
-is($job->{state},    "scheduled", "cloned jobs are scheduled");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, "cloned jobs are scheduled");
+is($job->{clone_id}, undef,     'no clones');
 
 $job = job_get_deps($jobC2);
-is($job->{state},    "scheduled", "cloned jobs are scheduled");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, "cloned jobs are scheduled");
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [$jobB2], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 $job = job_get_deps($jobF2);
-is($job->{state},    "scheduled", "cloned jobs are scheduled");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, "cloned jobs are scheduled");
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [$jobC2], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 # recheck that cloning didn't change MM API results children status
@@ -518,65 +520,65 @@ $t->get_ok('/api/v1/mm/children/scheduled')->status_is(200)->json_is('/jobs' =>
 $t->get_ok('/api/v1/mm/children/done')->status_is(200)->json_is('/jobs' => [$jobE->id]);
 
 $job = job_get_deps($jobA->id);    # cloned
-is($job->{state},  "done",   "no change");
-is($job->{result}, "failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,   'no change');
+is($job->{result}, FAILED, 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 my $jobA2 = $job->{clone_id};
 
 $job = job_get_deps($jobB->id);    # unchanged
 is($job->{result},   "parallel_failed", "B is unchanged");
-is($job->{clone_id}, $jobB2,            "cloned");
+is($job->{clone_id}, $jobB2,            'cloned');
 
 $job = job_get_deps($jobC->id);    # unchanged
-is($job->{state},    "running",         "no change");
+is($job->{state},    RUNNING,           'no change');
 is($job->{result},   "parallel_failed", "C is restarted");
-is($job->{clone_id}, $jobC2,            "cloned");
+is($job->{clone_id}, $jobC2,            'cloned');
 
 $job = job_get_deps($jobD->id);    #cloned
-is($job->{state},  "done",            "no change");
-is($job->{result}, "parallel_failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,              'no change');
+is($job->{result}, "parallel_failed", 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 my $jobD2 = $job->{clone_id};
 
 $job = job_get_deps($jobE->id);    #cloned
-is($job->{state},  "done",            "no change");
-is($job->{result}, "parallel_failed", "no change");
-ok(defined $job->{clone_id}, "cloned");
+is($job->{state},  DONE,              'no change');
+is($job->{result}, "parallel_failed", 'no change');
+ok(defined $job->{clone_id}, 'cloned');
 my $jobE2 = $job->{clone_id};
 
 $job = job_get_deps($jobF->id);    # unchanged
-is($job->{state},    "running", "no change");
-is($job->{clone_id}, $jobF2,    "cloned");
+is($job->{state},    RUNNING, 'no change');
+is($job->{clone_id}, $jobF2,  'cloned');
 
 $job = job_get_deps($jobA2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 $job = job_get_deps($jobB2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 $job = job_get_deps($jobC2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [$jobB2], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 
 $job = job_get_deps($jobD2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [$jobA2], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 $job = job_get_deps($jobE2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply([sort @{$job->{parents}->{Parallel}}], [sort ($jobC2, $jobD2)], "cloned deps");
 
 $job = job_get_deps($jobF2);
-is($job->{state},    "scheduled", "no change");
-is($job->{clone_id}, undef,       "no clones");
+is($job->{state},    SCHEDULED, 'no change');
+is($job->{clone_id}, undef,     'no clones');
 is_deeply($job->{parents}, {Parallel => [$jobC2], Chained => [], 'Directly chained' => []}, "cloned deps");
 
 # now we have:
@@ -614,7 +616,7 @@ $t->get_ok('/api/v1/mm/children/done')->status_is(200)->json_is('/jobs' => [$job
 # We have 3 free workers (as B,C and F are still running)
 # and the cluster is 6, so we expect nothing to be SCHEDULED
 schedule();
-is(job_get_deps($jobA2)->{state}, "scheduled", "no change");
+is(job_get_deps($jobA2)->{state}, SCHEDULED, 'job still scheduled');
 
 # now free two of them and create one more worker. So that we
 # have 6 free, but have vlan 1 still busy
@@ -686,8 +688,8 @@ is_deeply(
 # when Y is scheduled and X is duplicated, Y must be cancelled and Y2 needs to depend on X2
 my $jobX2 = $jobX->auto_duplicate;
 $jobY->discard_changes;
-is($jobY->state,  OpenQA::Jobs::Constants::CANCELLED,          'jobY was cancelled');
-is($jobY->result, OpenQA::Jobs::Constants::PARALLEL_RESTARTED, 'jobY was skipped');
+is($jobY->state,  CANCELLED,          'jobY was cancelled');
+is($jobY->result, PARALLEL_RESTARTED, 'jobY was skipped');
 my $jobY2 = $jobY->clone;
 ok(defined $jobY2, "jobY was cloned too");
 is($jobY2->blocked_by_id, $jobX2->id, "JobY2 is blocked");
@@ -771,7 +773,7 @@ my $jobJ = _job_create(\%settingsJ, [$jobH->id]);
 my $jobL = _job_create(\%settingsL, [$jobJ->id]);
 
 # hack jobs to appear running to scheduler
-_jobs_update_state([$jobH, $jobJ, $jobK, $jobL], OpenQA::Jobs::Constants::RUNNING);
+_jobs_update_state([$jobH, $jobJ, $jobK, $jobL], RUNNING);
 
 # expected output after cloning D, all jobs scheduled
 # H2 <-(parallel) J2
@@ -835,8 +837,8 @@ is($jobW->blocked_by_id,  $jobQ->id, 'JobW is blocked by job supposed to run bef
 #       later. Neverthless, let's explicitly assert this behavior so we know what we have right now.
 
 # hack jobs to appear to scheduler in desired state
-_jobs_update_state([$jobQ],                              OpenQA::Jobs::Constants::DONE);
-_jobs_update_state([$jobW, $jobU, $jobR, $jobT, $jobTA], OpenQA::Jobs::Constants::RUNNING);
+_jobs_update_state([$jobQ],                              DONE);
+_jobs_update_state([$jobW, $jobU, $jobR, $jobT, $jobTA], RUNNING);
 
 # duplicate job U
 # expected state (excluding TA2 which is just the same as T2 just directly chained to Q):
@@ -950,7 +952,7 @@ my $jobO = _job_create(\%settingsO, [$jobP->id]);
 my $jobI = _job_create(\%settingsI, [$jobO->id]);
 
 # hack jobs to appear to scheduler in desired state
-_jobs_update_state([$jobP, $jobO, $jobI], OpenQA::Jobs::Constants::DONE);
+_jobs_update_state([$jobP, $jobO, $jobI], DONE);
 
 # cloning O gets to expected state
 #
@@ -979,9 +981,9 @@ $jobO2 = $schema->resultset('Jobs')->search({id => $jobO2->{id}})->single;
 $jobP2 = $schema->resultset('Jobs')->search({id => $jobP2->{id}})->single;
 $jobI2 = $schema->resultset('Jobs')->search({id => $jobI2->{id}})->single;
 # set P2 running and O2 done
-_jobs_update_state([$jobP2], OpenQA::Jobs::Constants::RUNNING);
-_jobs_update_state([$jobO2], OpenQA::Jobs::Constants::DONE);
-_jobs_update_state([$jobI2], OpenQA::Jobs::Constants::DONE);
+_jobs_update_state([$jobP2], RUNNING);
+_jobs_update_state([$jobO2], DONE);
+_jobs_update_state([$jobI2], DONE);
 
 # cloning I gets to expected state:
 # P3 <-(parallel) O3 <-(parallel) I2
@@ -1015,10 +1017,10 @@ $jobC = _job_create(\%settingsC, undef, [$jobA->id]);
 $jobD = _job_create(\%settingsD, undef, [$jobA->id]);
 
 # hack jobs to appear done to scheduler
-_jobs_update_state([$jobA, $jobB, $jobC, $jobD], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED);
+_jobs_update_state([$jobA, $jobB, $jobC, $jobD], DONE, PASSED);
 
 # only job B failed as incomplete
-$jobB->result(OpenQA::Jobs::Constants::INCOMPLETE);
+$jobB->result(INCOMPLETE);
 $jobB->update;
 
 # situation, all chained and done, B is incomplete:
@@ -1052,7 +1054,7 @@ is_deeply($jobD_h->{parents}->{Chained}, [$jobA->id], 'jobD has jobA as chained
 is($jobD_h->{settings}{TEST}, $jobD->TEST, 'jobBc test and jobB test are equal');
 
 # hack jobs to appear running to scheduler
-$jobB->clone->state(OpenQA::Jobs::Constants::RUNNING);
+$jobB->clone->state(RUNNING);
 $jobB->clone->update;
 
 # clone A
@@ -1062,7 +1064,7 @@ $jobA2 = $jobA->auto_duplicate;
 ok($jobA2, 'jobA duplicated');
 $jobA->discard_changes;
 
-$jobA->clone->state(OpenQA::Jobs::Constants::RUNNING);
+$jobA->clone->state(RUNNING);
 $jobA->clone->update;
 $jobA2 = $jobA->clone->auto_duplicate;
 ok($jobA2, 'jobA->clone duplicated');
@@ -1112,8 +1114,8 @@ $jobC = _job_create(\%settingsC, undef, [$jobA->id]);
 $jobD = _job_create(\%settingsD, undef, [$jobA->id]);
 
 # hack jobs to appear done to scheduler
-_jobs_update_state([$jobA], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED);
-_jobs_update_state([$jobB, $jobC, $jobD], OpenQA::Jobs::Constants::RUNNING);
+_jobs_update_state([$jobA], DONE, PASSED);
+_jobs_update_state([$jobB, $jobC, $jobD], RUNNING);
 
 $jobA2 = $jobA->auto_duplicate;
 $_->discard_changes for ($jobA, $jobB, $jobC, $jobD);
@@ -1127,7 +1129,7 @@ for ($jobB, $jobC, $jobD) {
 # set jobA2 as running and clone it
 $jobA2 = $jobA->clone;
 is($jobA2->id, $jobA2->id, 'jobA2 is indeed jobA clone');
-$jobA2->state(OpenQA::Jobs::Constants::RUNNING);
+$jobA2->state(RUNNING);
 $jobA2->update;
 my $jobA3 = $jobA2->auto_duplicate;
 ok($jobA3, "cloned A2");
@@ -1158,8 +1160,8 @@ my $duplicate_test = sub {
     $jobD = _job_create(\%settingsD, [$jobB->id], [$jobA->id]);
 
     # hack jobs to appear done to scheduler
-    _jobs_update_state([$jobA],               OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED);
-    _jobs_update_state([$jobB, $jobC, $jobD], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::FAILED);
+    _jobs_update_state([$jobA],               DONE, PASSED);
+    _jobs_update_state([$jobB, $jobC, $jobD], DONE, FAILED);
 
     $jobA2 = $jobA->auto_duplicate;
     $_->discard_changes for ($jobA, $jobB, $jobC, $jobD);
@@ -1181,7 +1183,7 @@ sub _job_create_set_done {
     my ($settings, $state) = @_;
     my $job = _job_create($settings);
     # hack jobs to appear done to scheduler
-    _jobs_update_state([$job], $state, OpenQA::Jobs::Constants::PASSED);
+    _jobs_update_state([$job], $state, PASSED);
     return $job;
 }
 
@@ -1221,23 +1223,23 @@ my $slepos_test_workers = sub {
     $settingsT{TEST} = 'Terminal';
 
     # Support server
-    my $jobSUS = _job_create_set_done(\%settingsSUS, OpenQA::Jobs::Constants::DONE);
+    my $jobSUS = _job_create_set_done(\%settingsSUS, DONE);
     # Admin Server 1
     $settingsAS{_PARALLEL_JOBS} = [$jobSUS->id];
-    my $jobAS = _job_create_set_done(\%settingsAS, OpenQA::Jobs::Constants::DONE);
+    my $jobAS = _job_create_set_done(\%settingsAS, DONE);
     # Image server 2
     $settingsIS2{_START_AFTER_JOBS} = [$jobAS->id];
-    my $jobIS2 = _job_create_set_done(\%settingsIS2, OpenQA::Jobs::Constants::DONE);
+    my $jobIS2 = _job_create_set_done(\%settingsIS2, DONE);
     # Image server
     $settingsIS{_PARALLEL_JOBS}    = [$jobSUS->id];
     $settingsIS{_START_AFTER_JOBS} = [$jobAS->id];
-    my $jobIS = _job_create_set_done(\%settingsIS, OpenQA::Jobs::Constants::CANCELLED);
+    my $jobIS = _job_create_set_done(\%settingsIS, CANCELLED);
     # Branch server
     $settingsBS{_PARALLEL_JOBS} = [$jobAS->id, $jobSUS->id];
-    my $jobBS = _job_create_set_done(\%settingsBS, OpenQA::Jobs::Constants::DONE);
+    my $jobBS = _job_create_set_done(\%settingsBS, DONE);
     # Terminal
     $settingsT{_PARALLEL_JOBS} = [$jobBS->id];
-    my $jobT = _job_create_set_done(\%settingsT, OpenQA::Jobs::Constants::DONE);
+    my $jobT = _job_create_set_done(\%settingsT, DONE);
     # clone terminal
     $jobT->duplicate;
     $_->discard_changes for ($jobSUS, $jobAS, $jobIS, $jobIS2, $jobBS, $jobT);
@@ -1289,10 +1291,10 @@ subtest "SAP setup - issue 52928" => sub {
     $settingsH{TEST}             = 'final';
     $settingsH{START_AFTER_TEST} = 'supportserver,hdd_gnome';
 
-    my $jobA = _job_create_set_done(\%settingsA, OpenQA::Jobs::Constants::DONE);
+    my $jobA = _job_create_set_done(\%settingsA, DONE);
     $settingsB{_START_AFTER_JOBS} = [$jobA->id];
     my $jobB = _job_create(\%settingsB);
-    my $jobC = _job_create_set_done(\%settingsC, OpenQA::Jobs::Constants::DONE);
+    my $jobC = _job_create_set_done(\%settingsC, DONE);
     $settingsD{_START_AFTER_JOBS} = [$jobC->id, $jobB->id];
     my $jobD = _job_create(\%settingsD);
     my $jobE = _job_create(\%settingsE);
diff --git a/t/05-scheduler-full.t b/t/05-scheduler-full.t
index 59bad17e09..9e9b6b56f7 100644
--- a/t/05-scheduler-full.t
+++ b/t/05-scheduler-full.t
@@ -90,30 +90,30 @@ sub create_worker {
 }
 
 subtest 'Scheduler worker job allocation' => sub {
-    # Step 1
-    my $allocated = scheduler_step();    # Will try to allocate to previous worker and fail!
+    note('try to allocate to previous worker (supposed to fail)');
+    my $allocated = scheduler_step();
     is @$allocated, 0;
 
+    note('starting two workers');
     my $w1_pid = create_worker($k->key, $k->secret, "http://localhost:$mojoport", 1);
     my $w2_pid = create_worker($k->key, $k->secret, "http://localhost:$mojoport", 2);
     wait_for_worker($schema, 3);
     wait_for_worker($schema, 4);
 
-    ($allocated) = scheduler_step();     # Will try to allocate to previous worker and fail!
-
-    my $job_id1 = $allocated->[0]->{job};
-    my $job_id2 = $allocated->[1]->{job};
-    my $wr_id1  = $allocated->[0]->{worker};
-    my $wr_id2  = $allocated->[1]->{worker};
-    ok $wr_id1 != $wr_id2,   "Jobs dispatched to different workers";
-    ok $job_id1 != $job_id2, "Jobs dispatched to different workers";
-
+    note('assigning one job to each worker');
+    ($allocated) = scheduler_step();
+    my $job_id1           = $allocated->[0]->{job};
+    my $job_id2           = $allocated->[1]->{job};
+    my $wr_id1            = $allocated->[0]->{worker};
+    my $wr_id2            = $allocated->[1]->{worker};
+    my $different_workers = isnt($wr_id1, $wr_id2, 'jobs dispatched to different workers');
+    my $different_jobs    = isnt($job_id1, $job_id2, 'each of the two jobs allocated to one of the workers');
+    diag explain $allocated unless $different_workers && $different_jobs;
 
     ($allocated) = scheduler_step();
     is @$allocated, 0;
 
     kill_service($_, 1) for ($w1_pid, $w2_pid);
-
     dead_workers($schema);
 };