From 23ab6439266d6cb52eba3b1a455f3813e94b450c Mon Sep 17 00:00:00 2001 From: Adam Williamson Date: Feb 05 2020 18:42:36 +0000 Subject: Backport #2667 to fix a scheduling race causing incomplete jobs --- diff --git a/2667.patch b/2667.patch new file mode 100644 index 0000000..ada1566 --- /dev/null +++ b/2667.patch @@ -0,0 +1,1027 @@ +From f3ea708082fac95684ca5f925bb356ced1c1705e Mon Sep 17 00:00:00 2001 +From: Marius Kittler +Date: Wed, 15 Jan 2020 18:59:18 +0100 +Subject: [PATCH 1/4] Avoid race condition in job scheduling + +* Associate workers and jobs before sending the job to the worker +* Consider all unfinished jobs in stale job handling on worker + status updates via web sockets + +This should help to avoid the problems mentioned in the ticket +https://progress.opensuse.org/issues/62015. +--- + lib/OpenQA/Resource/Jobs.pm | 25 +++---- + lib/OpenQA/Scheduler/Model/Jobs.pm | 80 ++++++++++------------ + lib/OpenQA/Schema/Result/Jobs.pm | 35 +++------- + lib/OpenQA/Schema/Result/Workers.pm | 5 ++ + lib/OpenQA/WebSockets/Controller/Worker.pm | 44 +++++++----- + t/04-scheduler.t | 5 +- + 6 files changed, 90 insertions(+), 104 deletions(-) + +diff --git a/lib/OpenQA/Resource/Jobs.pm b/lib/OpenQA/Resource/Jobs.pm +index 4fcad94924..94096bd983 100644 +--- a/lib/OpenQA/Resource/Jobs.pm ++++ b/lib/OpenQA/Resource/Jobs.pm +@@ -42,41 +42,32 @@ or done. Scheduled jobs can't be restarted. + sub job_restart { + my ($jobids) = @_ or die "missing name parameter\n"; + +- # first, duplicate all jobs that are either running or done ++ # duplicate all jobs that are either running or done + my $schema = OpenQA::Schema->singleton; + my $jobs = $schema->resultset("Jobs")->search( + { + id => $jobids, + state => [OpenQA::Jobs::Constants::EXECUTION_STATES, OpenQA::Jobs::Constants::FINAL_STATES], + }); +- + my @duplicated; + while (my $j = $jobs->next) { + my $dup = $j->auto_duplicate; + push @duplicated, $dup->{cluster_cloned} if $dup; + } + +- # then tell workers to abort +- $jobs = $schema->resultset("Jobs")->search( ++ # abort running jobs ++ my $running_jobs = $schema->resultset("Jobs")->search( + { + id => $jobids, + state => [OpenQA::Jobs::Constants::EXECUTION_STATES], + }); +- +- $jobs->search( +- { +- result => OpenQA::Jobs::Constants::NONE, +- } +- )->update( +- { +- result => OpenQA::Jobs::Constants::USER_RESTARTED, +- }); +- +- while (my $j = $jobs->next) { ++ $running_jobs->search({result => OpenQA::Jobs::Constants::NONE}) ++ ->update({result => OpenQA::Jobs::Constants::USER_RESTARTED}); ++ while (my $j = $running_jobs->next) { + $j->calculate_blocked_by; +- log_debug("enqueuing abort for " . $j->id . " " . $j->worker_id); +- $j->worker->send_command(command => 'abort', job_id => $j->id); ++ $j->abort; + } ++ + return @duplicated; + } + +diff --git a/lib/OpenQA/Scheduler/Model/Jobs.pm b/lib/OpenQA/Scheduler/Model/Jobs.pm +index 100450a409..1edc3f0880 100644 +--- a/lib/OpenQA/Scheduler/Model/Jobs.pm ++++ b/lib/OpenQA/Scheduler/Model/Jobs.pm +@@ -199,13 +199,33 @@ sub schedule { + next; + } + ++ # assign the jobs to the worker and then send the jobs to the worker ++ # note: The $worker->update(...) is also done when the worker sends a status update. That is ++ # required to track the worker's current job when assigning multiple jobs to it. We still ++ # need to set it here immediately to be sure the scheduler does not consider the worker ++ # free anymore. + my $res; + try { + if ($actual_job_count > 1) { ++ my %worker_assignment = ( ++ state => ASSIGNED, ++ t_started => undef, ++ assigned_worker_id => $worker_id, ++ ); ++ $schema->txn_do( ++ sub { ++ $_->update(\%worker_assignment) for @jobs; ++ $worker->set_current_job($jobs[0]); ++ }); + $res + = $self->_assign_multiple_jobs_to_worker(\@jobs, $worker, $directly_chained_job_sequence, $job_ids); + } + else { ++ $schema->txn_do( ++ sub { ++ $jobs[0]->set_assigned_worker($worker); ++ $worker->set_current_job($jobs[0]); ++ }); + $res = $jobs[0]->ws_send($worker); + } + die "Failed contacting websocket server over HTTP" unless ref($res) eq "HASH" && exists $res->{state}; +@@ -214,56 +234,30 @@ sub schedule { + log_debug("Failed to send data to websocket, reason: $_"); + }; + +- if (ref($res) eq "HASH" && $res->{state}->{msg_sent} == 1) { ++ if (ref($res) eq 'HASH' && $res->{state} && $res->{state}->{msg_sent} == 1) { + log_debug("Sent job(s) '$job_ids_str' to worker '$worker_id'"); ++ push(@successfully_allocated, map { {job => $_, worker => $worker_id} } @$job_ids); ++ next; ++ } + +- # associate the worker to the job, so the worker can send updates +- try { +- if ($actual_job_count > 1) { +- my %worker_assignment = ( +- state => ASSIGNED, +- t_started => undef, +- assigned_worker_id => $worker_id, +- ); +- $_->update(\%worker_assignment) for @jobs; +- $worker->update({job_id => $first_job_id}); +- # note: The job_id column of the workers table is updated as soon as the worker progresses +- # to the next job so the actually current job and current module can be displayed. +- } +- else { +- if ($jobs[0]->set_assigned_worker($worker)) { +- push(@successfully_allocated, {job => $first_job_id, worker => $worker_id}); +- } +- else { +- # Send abort and reschedule if we fail associating the job to the worker +- $jobs[0]->reschedule_rollback($worker); +- } +- } +- } +- catch { +- log_debug("Failed to set worker in scheduling state, reason: $_"); +- }; ++ # reset worker and jobs on failure ++ log_debug("Failed sending job(s) '$job_ids_str' to worker '$worker_id'"); ++ try { ++ $schema->txn_do(sub { $worker->unprepare_for_work; }); + } +- else { +- # reset worker and jobs on failure +- log_debug("Failed sending job(s) '$job_ids_str' to worker '$worker_id'"); ++ catch { ++ log_debug("Failed resetting unprepare worker, reason: $_"); ++ }; ++ for my $job (@jobs) { + try { +- $worker->unprepare_for_work; ++ # remove the associated worker and be sure to be in scheduled state. ++ $schema->txn_do(sub { $job->reschedule_state; }); + } + catch { +- log_debug("Failed resetting unprepare worker, reason: $_"); ++ # if we see this, we are in a really bad state ++ my $job_id = $job->id; ++ log_debug("Failed resetting job '$job_id' to scheduled state, reason: $_"); + }; +- for my $job (@jobs) { +- try { +- # Remove the associated worker and be sure to be in scheduled state. +- $job->reschedule_state; +- } +- catch { +- # Again: If we see this, we are in a really bad state. +- my $job_id = $job->id; +- log_debug("Failed resetting job '$job_id' to scheduled state, reason: $_"); +- }; +- } + } + } + +diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm +index 23c2de6317..1302743642 100644 +--- a/lib/OpenQA/Schema/Result/Jobs.pm ++++ b/lib/OpenQA/Schema/Result/Jobs.pm +@@ -343,34 +343,18 @@ sub reschedule_state { + } + } + +-sub reschedule_rollback { +- my ($self, $worker) = @_; +- $self->scheduler_abort($worker) +- ; # TODO: This might become a problem if we have duplicated job IDs from 2 or more web UIs. +- # Workers should be able to kill a job checking the (job token + job id) instead. +- $self->reschedule_state; +-} +- + sub set_assigned_worker { + my ($self, $worker) = @_; +- return 0 unless $worker; + ++ my $job_id = $self->id; ++ my $worker_id = $worker->id; + $self->update( + { + state => ASSIGNED, + t_started => undef, +- assigned_worker_id => $worker->id, ++ assigned_worker_id => $worker_id, + }); +- +- $worker->update({job_id => $self->id}); +- +- if ($worker->job->id eq $self->id) { +- log_debug("Job '" . $self->id . "' has worker '" . $worker->id . "' assigned"); +- return 1; +- } +- else { +- return 0; +- } ++ log_debug("Job '$job_id' has worker '$worker_id' assigned"); + } + + sub prepare_for_work { +@@ -925,10 +909,13 @@ sub _cluster_cloned { + } + + sub abort { +- my $self = shift; +- return unless $self->worker; +- log_debug("[Job#" . $self->id . "] Sending abort command"); +- $self->worker->send_command(command => 'abort', job_id => $self->id); ++ my $self = shift; ++ my $worker = $self->worker; ++ return undef unless $worker; ++ ++ my ($job_id, $worker_id) = ($self->id, $worker->id); ++ log_debug("Sending abort command to worker $worker_id for job $job_id"); ++ $worker->send_command(command => 'abort', job_id => $job_id); + } + + sub scheduler_abort { +diff --git a/lib/OpenQA/Schema/Result/Workers.pm b/lib/OpenQA/Schema/Result/Workers.pm +index 929b21e751..a66f23b11b 100644 +--- a/lib/OpenQA/Schema/Result/Workers.pm ++++ b/lib/OpenQA/Schema/Result/Workers.pm +@@ -272,4 +272,9 @@ sub unfinished_jobs { + return $self->previous_jobs->search({t_finished => undef}); + } + ++sub set_current_job { ++ my ($self, $job) = @_; ++ $self->update({job_id => $job->id}); ++} ++ + 1; +diff --git a/lib/OpenQA/WebSockets/Controller/Worker.pm b/lib/OpenQA/WebSockets/Controller/Worker.pm +index 5fe98ec880..10eff009ba 100644 +--- a/lib/OpenQA/WebSockets/Controller/Worker.pm ++++ b/lib/OpenQA/WebSockets/Controller/Worker.pm +@@ -147,51 +147,59 @@ sub _message { + log_debug("Could not send the population number to worker: $_"); + }; + +-# find the job currently associated with that worker and check whether the worker still executes the job it is supposed to ++ # find the job currently associated with that worker and check whether the worker still ++ # executes the job it is supposed to + try { + my $worker = $schema->resultset('Workers')->find($wid); + return undef unless $worker; + +- my $registered_job_id; ++ my $current_job_id; + my $registered_job_token; +- my $registered_job_state; +- my $registered_job = $worker->job; +- if ($registered_job) { +- $registered_job_id = $registered_job->id; +- $registered_job_state = $registered_job->state; ++ my $current_job_state; ++ my @unfinished_jobs = $worker->unfinished_jobs; ++ my $current_job = $worker->job // $unfinished_jobs[0]; ++ if ($current_job) { ++ $current_job_id = $current_job->id; ++ $current_job_state = $current_job->state; + } + + # log debugging info +- log_debug("Found job $registered_job_id in DB from worker_status update sent by worker $wid") +- if defined $registered_job_id; ++ log_debug("Found job $current_job_id in DB from worker_status update sent by worker $wid") ++ if defined $current_job_id; + log_debug("Received request has job id: $job_id") + if defined $job_id; + $registered_job_token = $worker->get_property('JOBTOKEN'); +- log_debug("Worker $wid for job $registered_job_id has token $registered_job_token") +- if defined $registered_job_id && defined $registered_job_token; ++ log_debug("Worker $wid for job $current_job_id has token $registered_job_token") ++ if defined $current_job_id && defined $registered_job_token; + log_debug("Received request has token: $job_token") + if defined $job_token; + +- # skip any further actions if worker just does the job we expected it to do ++ # skip any further actions if worker just does the one job we expected it to do + return undef + if ( defined $job_id +- && defined $registered_job_id ++ && defined $current_job_id + && defined $job_token + && defined $registered_job_token +- && $job_id eq $registered_job_id ++ && $job_id eq $current_job_id + && (my $job_token_correct = $job_token eq $registered_job_token) +- && OpenQA::Jobs::Constants::meta_state($registered_job_state) eq OpenQA::Jobs::Constants::EXECUTION); ++ && OpenQA::Jobs::Constants::meta_state($current_job_state) eq OpenQA::Jobs::Constants::EXECUTION) ++ && (scalar @unfinished_jobs <= 1); + + # handle the case when the worker does not work on the job(s) it is supposed to work on +- my @all_jobs_currently_associated_with_worker = ($registered_job, $worker->unfinished_jobs); ++ my @all_jobs_currently_associated_with_worker = ($current_job, @unfinished_jobs); ++ my %considered_jobs; + for my $associated_job (@all_jobs_currently_associated_with_worker) { + next unless defined $associated_job; + ++ # prevent doing this twice for the same job ($current_job and @unfinished_jobs might overlap) ++ my $job_id = $associated_job->id; ++ next if exists $considered_jobs{$job_id}; ++ $considered_jobs{$job_id} = 1; ++ + # do nothing if the job token is corrent and the worker claims that it is still working on that job + # or that the job is still pending + if ($job_token_correct) { +- my $job_id = $associated_job->id; +- next if $job_id eq $registered_job_id; ++ next if $job_id eq $current_job_id; + next if exists $pending_job_ids->{$job_id}; + } + +diff --git a/t/04-scheduler.t b/t/04-scheduler.t +index b62ebfcd62..cf086fb2a9 100644 +--- a/t/04-scheduler.t ++++ b/t/04-scheduler.t +@@ -332,8 +332,9 @@ is(scalar(@{$rjobs_before}) + 1, scalar(@{$rjobs_after}), "number of + is($rjobs_after->[-1]->{assigned_worker_id}, 1, 'assigned worker set'); + + $grabbed = job_get($job->id); +-is($grabbed->worker->id, $worker->{id}, 'correct worker assigned'); +-is($grabbed->state, ASSIGNED, 'job is in assigned state'); ++is($grabbed->assigned_worker_id, $worker->{id}, 'worker assigned to job'); ++is($grabbed->worker->id, $worker->{id}, 'job assigned to worker'); ++is($grabbed->state, ASSIGNED, 'job is in assigned state'); + + # register worker again with no job while the web UI thinks it has an assigned job + is(register_worker, $id, 'worker re-registered'); + +From 187a5ed85fdc8ac350a8b0141643c8d3acb9e081 Mon Sep 17 00:00:00 2001 +From: Marius Kittler +Date: Thu, 16 Jan 2020 15:15:57 +0100 +Subject: [PATCH 2/4] Simplify auto_duplicate code + +--- + lib/OpenQA/Schema/Result/Jobs.pm | 41 ++++++++++++-------------------- + t/05-scheduler-full.t | 6 ++++- + 2 files changed, 20 insertions(+), 27 deletions(-) + +diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm +index 1302743642..c8cb835519 100644 +--- a/lib/OpenQA/Schema/Result/Jobs.pm ++++ b/lib/OpenQA/Schema/Result/Jobs.pm +@@ -853,47 +853,35 @@ sub auto_duplicate { + # set this clone was triggered by manually if it's not auto-clone + $args->{dup_type_auto} //= 0; + ++ my $job_id = $self->id; + my $clones = $self->duplicate($args); +- +- unless ($clones) { +- log_debug('duplication failed'); +- return; ++ if (!$clones) { ++ log_debug("Duplication of job $job_id failed"); ++ return undef; + } ++ + # abort jobs in the old cluster (exclude the original $args->{jobid}) + my $rsource = $self->result_source; + my $jobs = $rsource->schema->resultset("Jobs")->search( + { +- id => {'!=' => $self->id, '-in' => [keys %$clones]}, ++ id => {'!=' => $job_id, '-in' => [keys %$clones]}, + state => [PRE_EXECUTION_STATES, EXECUTION_STATES], + }); + +- $jobs->search( +- { +- result => NONE, +- } +- )->update( +- { +- result => PARALLEL_RESTARTED, +- }); ++ $jobs->search({result => NONE})->update({result => PARALLEL_RESTARTED}); + + while (my $j = $jobs->next) { +- if ($j->worker) { +- log_debug("enqueuing abort for " . $j->id . " " . $j->worker_id); +- $j->worker->send_command(command => 'abort', job_id => $j->id); +- } +- else { +- if ($j->state eq SCHEDULED) { +- $j->release_networks; +- $j->update({state => CANCELLED}); +- } +- } ++ next if $j->abort; ++ next unless $j->state eq SCHEDULED; ++ $j->release_networks; ++ $j->update({state => CANCELLED}); + } + +- log_debug('new job ' . $clones->{$self->id}->{clone}); ++ log_debug('new job ' . $clones->{$job_id}->{clone}); + + # Attach all clones mapping to new job object + # TODO: better return a proper hash here +- my $dup = $rsource->resultset->find($clones->{$self->id}->{clone}); ++ my $dup = $rsource->resultset->find($clones->{$job_id}->{clone}); + $dup->_cluster_cloned($clones); + return $dup; + } +@@ -911,11 +899,12 @@ sub _cluster_cloned { + sub abort { + my $self = shift; + my $worker = $self->worker; +- return undef unless $worker; ++ return 0 unless $worker; + + my ($job_id, $worker_id) = ($self->id, $worker->id); + log_debug("Sending abort command to worker $worker_id for job $job_id"); + $worker->send_command(command => 'abort', job_id => $job_id); ++ return 1; + } + + sub scheduler_abort { +diff --git a/t/05-scheduler-full.t b/t/05-scheduler-full.t +index 0a8d95ab92..59bad17e09 100644 +--- a/t/05-scheduler-full.t ++++ b/t/05-scheduler-full.t +@@ -199,7 +199,11 @@ subtest 'Simulation of heavy unstable load' => sub { + dead_workers($schema); + my @duplicated; + +- push(@duplicated, $_->auto_duplicate()) for $schema->resultset("Jobs")->latest_jobs; ++ # duplicate latest jobs ignoring failures ++ for my $job ($schema->resultset('Jobs')->latest_jobs) { ++ my $duplicate = $job->auto_duplicate; ++ push(@duplicated, $duplicate) if defined $duplicate; ++ } + + push(@workers, unresponsive_worker($k->key, $k->secret, "http://localhost:$mojoport", $_)) for (1 .. 50); + my $i = 4; + +From 149e6423293562a03a471429b149895e72269ce2 Mon Sep 17 00:00:00 2001 +From: Marius Kittler +Date: Thu, 16 Jan 2020 15:18:46 +0100 +Subject: [PATCH 3/4] Consider assigned jobs when duplicating jobs + +When aborting jobs in the old cluster, treat assigned +jobs in the same way as scheduled jobs if the worker +hasn't accepted the job yet. +--- + lib/OpenQA/Schema/Result/Jobs.pm | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/lib/OpenQA/Schema/Result/Jobs.pm b/lib/OpenQA/Schema/Result/Jobs.pm +index c8cb835519..06c0213f71 100644 +--- a/lib/OpenQA/Schema/Result/Jobs.pm ++++ b/lib/OpenQA/Schema/Result/Jobs.pm +@@ -872,7 +872,7 @@ sub auto_duplicate { + + while (my $j = $jobs->next) { + next if $j->abort; +- next unless $j->state eq SCHEDULED; ++ next unless $j->state eq SCHEDULED || $j->state eq ASSIGNED; + $j->release_networks; + $j->update({state => CANCELLED}); + } + +From 127a9c0e380d10dc80f2ea74d57e2aa01504d1a3 Mon Sep 17 00:00:00 2001 +From: Marius Kittler +Date: Thu, 16 Jan 2020 15:21:54 +0100 +Subject: [PATCH 4/4] Improve coding style in scheduler tests + +--- + t/05-scheduler-capabilities.t | 20 ++-- + t/05-scheduler-dependencies.t | 176 +++++++++++++++++----------------- + t/05-scheduler-full.t | 24 ++--- + 3 files changed, 107 insertions(+), 113 deletions(-) + +diff --git a/t/05-scheduler-capabilities.t b/t/05-scheduler-capabilities.t +index 636fe9a990..886da525f5 100644 +--- a/t/05-scheduler-capabilities.t ++++ b/t/05-scheduler-capabilities.t +@@ -24,14 +24,14 @@ use lib "$FindBin::Bin/lib"; + use OpenQA::Scheduler::Model::Jobs; + use OpenQA::Constants 'WEBSOCKET_API_VERSION'; + use OpenQA::Test::Database; ++use OpenQA::WebAPI::Controller::API::V1::Worker; + use Test::Mojo; + use Test::More; + use Test::Warnings; + use Mojo::Util 'monkey_patch'; + +-my $schema = OpenQA::Test::Database->new->create; #(skip_fixtures => 1); +- +-my $sent = {}; ++my $schema = OpenQA::Test::Database->new->create; ++my $sent = {}; + + OpenQA::Scheduler::Model::Jobs->singleton->shuffle_workers(0); + +@@ -55,18 +55,13 @@ monkey_patch 'OpenQA::Schema::Result::Jobs', ws_send => sub { + return {state => {msg_sent => 1}}; + }; + +- +-#my $t = Test::Mojo->new('OpenQA::WebAPI'); +- + sub list_jobs { + my %args = @_; + [map { $_->to_hash(assets => 1) } $schema->resultset('Jobs')->complex_query(%args)->all]; + } + +-my $current_jobs = list_jobs(); +-#diag explain $current_jobs; +- +-my %settings = ( ++my $current_jobs = list_jobs; ++my %settings = ( + DISTRI => 'Unicorn', + FLAVOR => 'pink', + VERSION => '42', +@@ -174,9 +169,7 @@ $jobH->set_prio(8); + $jobI->set_prio(10); + $jobJ->set_prio(9); + +-use OpenQA::WebAPI::Controller::API::V1::Worker; +-my $c = OpenQA::WebAPI::Controller::API::V1::Worker->new; +- ++my $c = OpenQA::WebAPI::Controller::API::V1::Worker->new; + my $w1_id = $c->_register($schema, "host", "1", \%workercaps64_client); + my $w2_id = $c->_register($schema, "host", "2", \%workercaps64_server); + my $w3_id = $c->_register($schema, "host", "3", \%workercaps32); +@@ -220,7 +213,6 @@ is($job->{id}, $jobJ->id, + $job = $sent->{$w9_id}->{job}->to_hash; + is($job->{id}, $jobI->id, "this worker can do jobI, child - client"); + +- + # job G is not grabbed because there is no worker with class 'special' + + done_testing(); +diff --git a/t/05-scheduler-dependencies.t b/t/05-scheduler-dependencies.t +index 8437438f29..1b74aeb6f7 100644 +--- a/t/05-scheduler-dependencies.t ++++ b/t/05-scheduler-dependencies.t +@@ -1,6 +1,6 @@ + #!/usr/bin/env perl -w + +-# Copyright (C) 2014-2019 SUSE LLC ++# Copyright (C) 2014-2020 SUSE LLC + # + # This program is free software; you can redistribute it and/or modify + # it under the terms of the GNU General Public License as published by +@@ -169,7 +169,7 @@ OpenQA::Scheduler::Model::Jobs->singleton->shuffle_workers(0); + sub schedule { + my $id = OpenQA::Scheduler::Model::Jobs->singleton->schedule(); + for my $i (@$id) { +- _jobs_update_state([$schema->resultset('Jobs')->find($i->{job})], OpenQA::Jobs::Constants::RUNNING); ++ _jobs_update_state([$schema->resultset('Jobs')->find($i->{job})], RUNNING); + } + } + +@@ -180,7 +180,7 @@ $jobs_result_mock->mock( + ws_send => sub { + my ($self, $worker) = @_; + my $hashref = $self->prepare_for_work($worker); +- _jobs_update_state([$self], OpenQA::Jobs::Constants::RUNNING); ++ _jobs_update_state([$self], RUNNING); + + $hashref->{assigned_worker_id} = $worker->id; + $sent->{$worker->id} = {worker => $worker, job => $self, jobhash => $hashref}; +@@ -427,35 +427,37 @@ $result = $jobE->done(result => 'incomplete'); + is($result, 'incomplete', 'job_set_done on E'); + + my $job = job_get_deps($jobA->id); +-is($job->{state}, "done", "job_set_done changed state"); +-is($job->{result}, "failed", "job_set_done changed result"); ++is($job->{state}, DONE, 'job_set_done changed state'); ++is($job->{result}, FAILED, 'job_set_done changed result'); + + $job = job_get_deps($jobB->id); +-is($job->{state}, "running", "job_set_done changed state"); ++is($job->{state}, RUNNING, 'job_set_done changed state'); + + $job = job_get_deps($jobC->id); +-is($job->{state}, "running", "job_set_done changed state"); ++is($job->{state}, RUNNING, 'job_set_done changed state'); + + $job = job_get_deps($jobD->id); +-is($job->{state}, "done", "job_set_done changed state"); +-is($job->{result}, "parallel_failed", "job_set_done changed result, jobD failed because of jobA"); ++is($job->{state}, DONE, 'job_set_done changed state'); ++is($job->{result}, PARALLEL_FAILED, 'job_set_done changed result, jobD failed because of jobA'); + + $job = job_get_deps($jobE->id); +-is($job->{state}, "done", "job_set_done changed state"); +-is($job->{result}, "parallel_failed", "job_set_done changed result, jobE failed because of jobD"); ++is($job->{state}, DONE, 'job_set_done changed state'); ++is($job->{result}, PARALLEL_FAILED, 'job_set_done changed result, jobE failed because of jobD'); + + $jobF->discard_changes; + $job = job_get_deps($jobF->id); +-is($job->{state}, "running", "job_set_done changed state"); ++is($job->{state}, RUNNING, 'job_set_done changed state'); + + # check MM API for children status - available only for running jobs + my $worker = $schema->resultset("Workers")->find($worker_ids[1]); + my $t = Test::Mojo->new('OpenQA::WebAPI'); + ++my $job_token = $sent->{job}->{$jobC->id}->{worker}->get_property('JOBTOKEN'); ++isnt($job_token, undef, 'JOBTOKEN is present'); + $t->ua->on( + start => sub { + my ($ua, $tx) = @_; +- $tx->req->headers->add('X-API-JobToken' => $sent->{job}->{$jobC->id}->{worker}->get_property('JOBTOKEN')); ++ $tx->req->headers->add('X-API-JobToken' => $job_token); + }); + $t->get_ok('/api/v1/mm/children/running')->status_is(200)->json_is('/jobs' => [$jobF->id]) + ->or(sub { diag explain $t->tx->res->content }); +@@ -469,47 +471,47 @@ my $id = $jobF->auto_duplicate; + ok(defined $id, "duplicate works"); + + $job = job_get_deps($jobA->id); # cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, FAILED, 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + + $job = job_get_deps($jobB->id); # cloned + is($job->{result}, "parallel_failed", "$job->{id} B stopped"); +-ok(defined $job->{clone_id}, "cloned"); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobB2 = $job->{clone_id}; + + $job = job_get_deps($jobC->id); # cloned +-is($job->{state}, "running", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, RUNNING, 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobC2 = $job->{clone_id}; + + $job = job_get_deps($jobD->id); # cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "parallel_failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, "parallel_failed", 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + + $job = job_get_deps($jobE->id); # cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "parallel_failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, "parallel_failed", 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + + $job = job_get_deps($jobF->id); # cloned +-is($job->{state}, "running", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, RUNNING, 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobF2 = $job->{clone_id}; + + $job = job_get_deps($jobB2); +-is($job->{state}, "scheduled", "cloned jobs are scheduled"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, "cloned jobs are scheduled"); ++is($job->{clone_id}, undef, 'no clones'); + + $job = job_get_deps($jobC2); +-is($job->{state}, "scheduled", "cloned jobs are scheduled"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, "cloned jobs are scheduled"); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [$jobB2], Chained => [], 'Directly chained' => []}, "cloned deps"); + + $job = job_get_deps($jobF2); +-is($job->{state}, "scheduled", "cloned jobs are scheduled"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, "cloned jobs are scheduled"); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [$jobC2], Chained => [], 'Directly chained' => []}, "cloned deps"); + + # recheck that cloning didn't change MM API results children status +@@ -518,65 +520,65 @@ $t->get_ok('/api/v1/mm/children/scheduled')->status_is(200)->json_is('/jobs' => + $t->get_ok('/api/v1/mm/children/done')->status_is(200)->json_is('/jobs' => [$jobE->id]); + + $job = job_get_deps($jobA->id); # cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, FAILED, 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobA2 = $job->{clone_id}; + + $job = job_get_deps($jobB->id); # unchanged + is($job->{result}, "parallel_failed", "B is unchanged"); +-is($job->{clone_id}, $jobB2, "cloned"); ++is($job->{clone_id}, $jobB2, 'cloned'); + + $job = job_get_deps($jobC->id); # unchanged +-is($job->{state}, "running", "no change"); ++is($job->{state}, RUNNING, 'no change'); + is($job->{result}, "parallel_failed", "C is restarted"); +-is($job->{clone_id}, $jobC2, "cloned"); ++is($job->{clone_id}, $jobC2, 'cloned'); + + $job = job_get_deps($jobD->id); #cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "parallel_failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, "parallel_failed", 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobD2 = $job->{clone_id}; + + $job = job_get_deps($jobE->id); #cloned +-is($job->{state}, "done", "no change"); +-is($job->{result}, "parallel_failed", "no change"); +-ok(defined $job->{clone_id}, "cloned"); ++is($job->{state}, DONE, 'no change'); ++is($job->{result}, "parallel_failed", 'no change'); ++ok(defined $job->{clone_id}, 'cloned'); + my $jobE2 = $job->{clone_id}; + + $job = job_get_deps($jobF->id); # unchanged +-is($job->{state}, "running", "no change"); +-is($job->{clone_id}, $jobF2, "cloned"); ++is($job->{state}, RUNNING, 'no change'); ++is($job->{clone_id}, $jobF2, 'cloned'); + + $job = job_get_deps($jobA2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [], Chained => [], 'Directly chained' => []}, "cloned deps"); + + $job = job_get_deps($jobB2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [], Chained => [], 'Directly chained' => []}, "cloned deps"); + + $job = job_get_deps($jobC2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [$jobB2], Chained => [], 'Directly chained' => []}, "cloned deps"); + + + $job = job_get_deps($jobD2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [$jobA2], Chained => [], 'Directly chained' => []}, "cloned deps"); + + $job = job_get_deps($jobE2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply([sort @{$job->{parents}->{Parallel}}], [sort ($jobC2, $jobD2)], "cloned deps"); + + $job = job_get_deps($jobF2); +-is($job->{state}, "scheduled", "no change"); +-is($job->{clone_id}, undef, "no clones"); ++is($job->{state}, SCHEDULED, 'no change'); ++is($job->{clone_id}, undef, 'no clones'); + is_deeply($job->{parents}, {Parallel => [$jobC2], Chained => [], 'Directly chained' => []}, "cloned deps"); + + # now we have: +@@ -614,7 +616,7 @@ $t->get_ok('/api/v1/mm/children/done')->status_is(200)->json_is('/jobs' => [$job + # We have 3 free workers (as B,C and F are still running) + # and the cluster is 6, so we expect nothing to be SCHEDULED + schedule(); +-is(job_get_deps($jobA2)->{state}, "scheduled", "no change"); ++is(job_get_deps($jobA2)->{state}, SCHEDULED, 'job still scheduled'); + + # now free two of them and create one more worker. So that we + # have 6 free, but have vlan 1 still busy +@@ -686,8 +688,8 @@ is_deeply( + # when Y is scheduled and X is duplicated, Y must be cancelled and Y2 needs to depend on X2 + my $jobX2 = $jobX->auto_duplicate; + $jobY->discard_changes; +-is($jobY->state, OpenQA::Jobs::Constants::CANCELLED, 'jobY was cancelled'); +-is($jobY->result, OpenQA::Jobs::Constants::PARALLEL_RESTARTED, 'jobY was skipped'); ++is($jobY->state, CANCELLED, 'jobY was cancelled'); ++is($jobY->result, PARALLEL_RESTARTED, 'jobY was skipped'); + my $jobY2 = $jobY->clone; + ok(defined $jobY2, "jobY was cloned too"); + is($jobY2->blocked_by_id, $jobX2->id, "JobY2 is blocked"); +@@ -771,7 +773,7 @@ my $jobJ = _job_create(\%settingsJ, [$jobH->id]); + my $jobL = _job_create(\%settingsL, [$jobJ->id]); + + # hack jobs to appear running to scheduler +-_jobs_update_state([$jobH, $jobJ, $jobK, $jobL], OpenQA::Jobs::Constants::RUNNING); ++_jobs_update_state([$jobH, $jobJ, $jobK, $jobL], RUNNING); + + # expected output after cloning D, all jobs scheduled + # H2 <-(parallel) J2 +@@ -835,8 +837,8 @@ is($jobW->blocked_by_id, $jobQ->id, 'JobW is blocked by job supposed to run bef + # later. Neverthless, let's explicitly assert this behavior so we know what we have right now. + + # hack jobs to appear to scheduler in desired state +-_jobs_update_state([$jobQ], OpenQA::Jobs::Constants::DONE); +-_jobs_update_state([$jobW, $jobU, $jobR, $jobT, $jobTA], OpenQA::Jobs::Constants::RUNNING); ++_jobs_update_state([$jobQ], DONE); ++_jobs_update_state([$jobW, $jobU, $jobR, $jobT, $jobTA], RUNNING); + + # duplicate job U + # expected state (excluding TA2 which is just the same as T2 just directly chained to Q): +@@ -950,7 +952,7 @@ my $jobO = _job_create(\%settingsO, [$jobP->id]); + my $jobI = _job_create(\%settingsI, [$jobO->id]); + + # hack jobs to appear to scheduler in desired state +-_jobs_update_state([$jobP, $jobO, $jobI], OpenQA::Jobs::Constants::DONE); ++_jobs_update_state([$jobP, $jobO, $jobI], DONE); + + # cloning O gets to expected state + # +@@ -979,9 +981,9 @@ $jobO2 = $schema->resultset('Jobs')->search({id => $jobO2->{id}})->single; + $jobP2 = $schema->resultset('Jobs')->search({id => $jobP2->{id}})->single; + $jobI2 = $schema->resultset('Jobs')->search({id => $jobI2->{id}})->single; + # set P2 running and O2 done +-_jobs_update_state([$jobP2], OpenQA::Jobs::Constants::RUNNING); +-_jobs_update_state([$jobO2], OpenQA::Jobs::Constants::DONE); +-_jobs_update_state([$jobI2], OpenQA::Jobs::Constants::DONE); ++_jobs_update_state([$jobP2], RUNNING); ++_jobs_update_state([$jobO2], DONE); ++_jobs_update_state([$jobI2], DONE); + + # cloning I gets to expected state: + # P3 <-(parallel) O3 <-(parallel) I2 +@@ -1015,10 +1017,10 @@ $jobC = _job_create(\%settingsC, undef, [$jobA->id]); + $jobD = _job_create(\%settingsD, undef, [$jobA->id]); + + # hack jobs to appear done to scheduler +-_jobs_update_state([$jobA, $jobB, $jobC, $jobD], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED); ++_jobs_update_state([$jobA, $jobB, $jobC, $jobD], DONE, PASSED); + + # only job B failed as incomplete +-$jobB->result(OpenQA::Jobs::Constants::INCOMPLETE); ++$jobB->result(INCOMPLETE); + $jobB->update; + + # situation, all chained and done, B is incomplete: +@@ -1052,7 +1054,7 @@ is_deeply($jobD_h->{parents}->{Chained}, [$jobA->id], 'jobD has jobA as chained + is($jobD_h->{settings}{TEST}, $jobD->TEST, 'jobBc test and jobB test are equal'); + + # hack jobs to appear running to scheduler +-$jobB->clone->state(OpenQA::Jobs::Constants::RUNNING); ++$jobB->clone->state(RUNNING); + $jobB->clone->update; + + # clone A +@@ -1062,7 +1064,7 @@ $jobA2 = $jobA->auto_duplicate; + ok($jobA2, 'jobA duplicated'); + $jobA->discard_changes; + +-$jobA->clone->state(OpenQA::Jobs::Constants::RUNNING); ++$jobA->clone->state(RUNNING); + $jobA->clone->update; + $jobA2 = $jobA->clone->auto_duplicate; + ok($jobA2, 'jobA->clone duplicated'); +@@ -1112,8 +1114,8 @@ $jobC = _job_create(\%settingsC, undef, [$jobA->id]); + $jobD = _job_create(\%settingsD, undef, [$jobA->id]); + + # hack jobs to appear done to scheduler +-_jobs_update_state([$jobA], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED); +-_jobs_update_state([$jobB, $jobC, $jobD], OpenQA::Jobs::Constants::RUNNING); ++_jobs_update_state([$jobA], DONE, PASSED); ++_jobs_update_state([$jobB, $jobC, $jobD], RUNNING); + + $jobA2 = $jobA->auto_duplicate; + $_->discard_changes for ($jobA, $jobB, $jobC, $jobD); +@@ -1127,7 +1129,7 @@ for ($jobB, $jobC, $jobD) { + # set jobA2 as running and clone it + $jobA2 = $jobA->clone; + is($jobA2->id, $jobA2->id, 'jobA2 is indeed jobA clone'); +-$jobA2->state(OpenQA::Jobs::Constants::RUNNING); ++$jobA2->state(RUNNING); + $jobA2->update; + my $jobA3 = $jobA2->auto_duplicate; + ok($jobA3, "cloned A2"); +@@ -1158,8 +1160,8 @@ my $duplicate_test = sub { + $jobD = _job_create(\%settingsD, [$jobB->id], [$jobA->id]); + + # hack jobs to appear done to scheduler +- _jobs_update_state([$jobA], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::PASSED); +- _jobs_update_state([$jobB, $jobC, $jobD], OpenQA::Jobs::Constants::DONE, OpenQA::Jobs::Constants::FAILED); ++ _jobs_update_state([$jobA], DONE, PASSED); ++ _jobs_update_state([$jobB, $jobC, $jobD], DONE, FAILED); + + $jobA2 = $jobA->auto_duplicate; + $_->discard_changes for ($jobA, $jobB, $jobC, $jobD); +@@ -1181,7 +1183,7 @@ sub _job_create_set_done { + my ($settings, $state) = @_; + my $job = _job_create($settings); + # hack jobs to appear done to scheduler +- _jobs_update_state([$job], $state, OpenQA::Jobs::Constants::PASSED); ++ _jobs_update_state([$job], $state, PASSED); + return $job; + } + +@@ -1221,23 +1223,23 @@ my $slepos_test_workers = sub { + $settingsT{TEST} = 'Terminal'; + + # Support server +- my $jobSUS = _job_create_set_done(\%settingsSUS, OpenQA::Jobs::Constants::DONE); ++ my $jobSUS = _job_create_set_done(\%settingsSUS, DONE); + # Admin Server 1 + $settingsAS{_PARALLEL_JOBS} = [$jobSUS->id]; +- my $jobAS = _job_create_set_done(\%settingsAS, OpenQA::Jobs::Constants::DONE); ++ my $jobAS = _job_create_set_done(\%settingsAS, DONE); + # Image server 2 + $settingsIS2{_START_AFTER_JOBS} = [$jobAS->id]; +- my $jobIS2 = _job_create_set_done(\%settingsIS2, OpenQA::Jobs::Constants::DONE); ++ my $jobIS2 = _job_create_set_done(\%settingsIS2, DONE); + # Image server + $settingsIS{_PARALLEL_JOBS} = [$jobSUS->id]; + $settingsIS{_START_AFTER_JOBS} = [$jobAS->id]; +- my $jobIS = _job_create_set_done(\%settingsIS, OpenQA::Jobs::Constants::CANCELLED); ++ my $jobIS = _job_create_set_done(\%settingsIS, CANCELLED); + # Branch server + $settingsBS{_PARALLEL_JOBS} = [$jobAS->id, $jobSUS->id]; +- my $jobBS = _job_create_set_done(\%settingsBS, OpenQA::Jobs::Constants::DONE); ++ my $jobBS = _job_create_set_done(\%settingsBS, DONE); + # Terminal + $settingsT{_PARALLEL_JOBS} = [$jobBS->id]; +- my $jobT = _job_create_set_done(\%settingsT, OpenQA::Jobs::Constants::DONE); ++ my $jobT = _job_create_set_done(\%settingsT, DONE); + # clone terminal + $jobT->duplicate; + $_->discard_changes for ($jobSUS, $jobAS, $jobIS, $jobIS2, $jobBS, $jobT); +@@ -1289,10 +1291,10 @@ subtest "SAP setup - issue 52928" => sub { + $settingsH{TEST} = 'final'; + $settingsH{START_AFTER_TEST} = 'supportserver,hdd_gnome'; + +- my $jobA = _job_create_set_done(\%settingsA, OpenQA::Jobs::Constants::DONE); ++ my $jobA = _job_create_set_done(\%settingsA, DONE); + $settingsB{_START_AFTER_JOBS} = [$jobA->id]; + my $jobB = _job_create(\%settingsB); +- my $jobC = _job_create_set_done(\%settingsC, OpenQA::Jobs::Constants::DONE); ++ my $jobC = _job_create_set_done(\%settingsC, DONE); + $settingsD{_START_AFTER_JOBS} = [$jobC->id, $jobB->id]; + my $jobD = _job_create(\%settingsD); + my $jobE = _job_create(\%settingsE); +diff --git a/t/05-scheduler-full.t b/t/05-scheduler-full.t +index 59bad17e09..9e9b6b56f7 100644 +--- a/t/05-scheduler-full.t ++++ b/t/05-scheduler-full.t +@@ -90,30 +90,30 @@ sub create_worker { + } + + subtest 'Scheduler worker job allocation' => sub { +- # Step 1 +- my $allocated = scheduler_step(); # Will try to allocate to previous worker and fail! ++ note('try to allocate to previous worker (supposed to fail)'); ++ my $allocated = scheduler_step(); + is @$allocated, 0; + ++ note('starting two workers'); + my $w1_pid = create_worker($k->key, $k->secret, "http://localhost:$mojoport", 1); + my $w2_pid = create_worker($k->key, $k->secret, "http://localhost:$mojoport", 2); + wait_for_worker($schema, 3); + wait_for_worker($schema, 4); + +- ($allocated) = scheduler_step(); # Will try to allocate to previous worker and fail! +- +- my $job_id1 = $allocated->[0]->{job}; +- my $job_id2 = $allocated->[1]->{job}; +- my $wr_id1 = $allocated->[0]->{worker}; +- my $wr_id2 = $allocated->[1]->{worker}; +- ok $wr_id1 != $wr_id2, "Jobs dispatched to different workers"; +- ok $job_id1 != $job_id2, "Jobs dispatched to different workers"; +- ++ note('assigning one job to each worker'); ++ ($allocated) = scheduler_step(); ++ my $job_id1 = $allocated->[0]->{job}; ++ my $job_id2 = $allocated->[1]->{job}; ++ my $wr_id1 = $allocated->[0]->{worker}; ++ my $wr_id2 = $allocated->[1]->{worker}; ++ my $different_workers = isnt($wr_id1, $wr_id2, 'jobs dispatched to different workers'); ++ my $different_jobs = isnt($job_id1, $job_id2, 'each of the two jobs allocated to one of the workers'); ++ diag explain $allocated unless $different_workers && $different_jobs; + + ($allocated) = scheduler_step(); + is @$allocated, 0; + + kill_service($_, 1) for ($w1_pid, $w2_pid); +- + dead_workers($schema); + }; + diff --git a/openqa.spec b/openqa.spec index eb23629..90630c9 100644 --- a/openqa.spec +++ b/openqa.spec @@ -66,7 +66,7 @@ Name: openqa Version: %{github_version} -Release: 41%{?github_date:.%{github_date}git%{shortcommit}}%{?dist} +Release: 42%{?github_date:.%{github_date}git%{shortcommit}}%{?dist} Summary: OS-level automated testing framework License: GPLv2+ Url: http://os-autoinst.github.io/openQA/ @@ -92,6 +92,10 @@ Source4: 23-fedora-messaging.t # Fixes a broken return statement in asset download code that's # breaking our tests Patch0: 0001-Asset-download-fix-return-statement-broken-in-f0d70f.patch +# https://github.com/os-autoinst/openQA/pull/2667 +# Hopefully fixes a scheduling race that occasionally causes incomplete +# jobs +Patch1: 2667.patch BuildRequires: %{python_scripts_requires} BuildRequires: perl-generators @@ -542,6 +546,9 @@ fi %{_datadir}/openqa/lib/OpenQA/WebAPI/Plugin/FedoraUpdateRestart.pm %changelog +* Wed Feb 05 2020 Adam Williamson - 4.6-42.20200101git68ae00a +- Backport #2667 to fix a scheduling race causing incomplete jobs + * Wed Jan 29 2020 Fedora Release Engineering - 4.6-41.20200101git68ae00a - Rebuilt for https://fedoraproject.org/wiki/Fedora_32_Mass_Rebuild