diff --git a/src/condor_daemon_client/dc_startd.cpp b/src/condor_daemon_client/dc_startd.cpp index 7261c4a..09a2689 100644 --- a/src/condor_daemon_client/dc_startd.cpp +++ b/src/condor_daemon_client/dc_startd.cpp @@ -51,7 +51,7 @@ DCStartd::DCStartd( const char* tName, const char* tPool, const char* tAddr, } } -DCStartd::DCStartd( ClassAd *ad, const char *tPool ) +DCStartd::DCStartd( const ClassAd *ad, const char *tPool ) : Daemon(ad,DT_STARTD,tPool), claim_id(NULL) { diff --git a/src/condor_daemon_client/dc_startd.h b/src/condor_daemon_client/dc_startd.h index c5f3e89..ff20892 100644 --- a/src/condor_daemon_client/dc_startd.h +++ b/src/condor_daemon_client/dc_startd.h @@ -49,7 +49,7 @@ public: DCStartd( const char* const name, const char* const pool, const char* const addr, const char* const id ); - DCStartd( ClassAd *ad, const char *pool = NULL ); + DCStartd( const ClassAd *ad, const char *pool = NULL ); /// Destructor. ~DCStartd(); diff --git a/src/defrag/defrag.cpp b/src/defrag/defrag.cpp index 26aec0a..8710b5d 100644 --- a/src/defrag/defrag.cpp +++ b/src/defrag/defrag.cpp @@ -185,6 +185,8 @@ void Defrag::config() } } + m_can_cancel = param_boolean("DEFRAG_CAN_CANCEL", true); + param(m_defrag_name,"DEFRAG_NAME"); int stats_quantum = m_polling_interval; @@ -487,8 +489,17 @@ void Defrag::poll() int num_whole_machines = countMachines(m_whole_machine_expr.c_str(),"DEFRAG_WHOLE_MACHINE_EXPR",&whole_machines); m_stats.WholeMachines = num_whole_machines; + MachineSet draining_whole_machines; + std::stringstream draining_whole_machines_ss; + draining_whole_machines_ss << m_whole_machine_expr << " && Draining && Offline=!=True"; + int num_draining_whole_machines = countMachines(draining_whole_machines_ss.str().c_str(), + "", &draining_whole_machines); + dprintf(D_ALWAYS,"There are currently %d draining and %d whole machines.\n", num_draining,num_whole_machines); + if (num_draining_whole_machines) + dprintf(D_ALWAYS, "Of the %d whole machines, %d are in the draining state.\n", + num_whole_machines, num_draining_whole_machines); queryDrainingCost(); @@ -548,8 +559,7 @@ void Defrag::poll() ClassAdList startdAds; std::string requirements; - sprintf(requirements,"(%s) && Draining =!= true",m_defrag_requirements.c_str()); - if( !queryMachines(requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) { + if( !queryMachines(m_defrag_requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) { dprintf(D_ALWAYS,"Doing nothing, because the query to select machines matching DEFRAG_REQUIREMENTS failed.\n"); return; } @@ -561,12 +571,26 @@ void Defrag::poll() int num_drained = 0; ClassAd *startd_ad; MachineSet machines_done; + MachineSet draining_machines_done; while( (startd_ad=startdAds.Next()) ) { std::string machine; std::string name; startd_ad->LookupString(ATTR_NAME,name); slotNameToDaemonName(name,machine); + if( !draining_machines_done.count(machine) && draining_whole_machines.count(machine) ) { + cancel_drain(*startd_ad); + draining_machines_done.insert(machine); + continue; + } + + // Do not consider slots which are already draining. + bool startd_currently_draining = false; + startd_ad->LookupBool("Draining", startd_currently_draining); + if( startd_currently_draining ) { + continue; + } + if( machines_done.count(machine) ) { dprintf(D_FULLDEBUG, "Skipping %s: already attempted to drain %s in this cycle.\n", @@ -581,14 +605,13 @@ void Defrag::poll() continue; } - if( drain(startd_ad) ) { + if( (num_drained++ < num_to_drain) && drain(*startd_ad) ) { machines_done.insert(machine); - if( ++num_drained >= num_to_drain ) { + if( num_drained >= num_to_drain ) { dprintf(D_ALWAYS, "Drained maximum number of machines allowed in this cycle (%d).\n", num_to_drain); - break; } } } @@ -601,26 +624,24 @@ void Defrag::poll() } bool -Defrag::drain(ClassAd *startd_ad) +Defrag::drain(const ClassAd &startd_ad) { - ASSERT( startd_ad ); - std::string name; - startd_ad->LookupString(ATTR_NAME,name); + startd_ad.LookupString(ATTR_NAME,name); dprintf(D_ALWAYS,"Initiating %s draining of %s.\n", m_draining_schedule_str.c_str(),name.c_str()); - DCStartd startd( startd_ad ); + DCStartd startd( &startd_ad ); int graceful_completion = 0; - startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion); + startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion); int quick_completion = 0; - startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion); + startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion); int graceful_badput = 0; - startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput); + startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput); int quick_badput = 0; - startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput); + startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput); time_t now = time(NULL); std::string draining_check_expr; @@ -659,6 +680,27 @@ Defrag::drain(ClassAd *startd_ad) return true; } +bool +Defrag::cancel_drain(const ClassAd &startd_ad) +{ + + std::string name; + startd_ad.LookupString(ATTR_NAME,name); + + dprintf(D_ALWAYS,"Initiating %s draining of %s.\n", + m_draining_schedule_str.c_str(),name.c_str()); + + DCStartd startd( &startd_ad ); + + bool rval = startd.cancelDrainJobs( NULL ); + if ( rval ) { + dprintf(D_FULLDEBUG, "Sent request to cancel draining on %s\n", startd.name()); + } else { + dprintf(D_ALWAYS, "Unable to cancel draining on %s: %s\n", startd.name(), startd.error()); + } + return rval; +} + void Defrag::publish(ClassAd *ad) { diff --git a/src/defrag/defrag.h b/src/defrag/defrag.h index 8c7fd51..909b569 100644 --- a/src/defrag/defrag.h +++ b/src/defrag/defrag.h @@ -40,11 +40,11 @@ class Defrag: public Service { void stop(); void poll(); // do the periodic policy evaluation - bool drain(ClassAd *startd_ad); typedef std::set< std::string > MachineSet; private: + int m_polling_interval; // delay between evaluations of the policy int m_polling_timer; double m_draining_per_hour; @@ -58,6 +58,7 @@ class Defrag: public Service { ClassAd m_rank_ad; int m_draining_schedule; std::string m_draining_schedule_str; + bool m_can_cancel; // Whether condor_defrag can also cancel draining early. time_t m_last_poll; @@ -70,6 +71,9 @@ class Defrag: public Service { ClassAd m_public_ad; DefragStats m_stats; + bool drain(const ClassAd &startd_ad); + bool cancel_drain(const ClassAd &startd_ad); + void validateExpr(char const *constraint,char const *constraint_source); bool queryMachines(char const *constraint,char const *constraint_source,ClassAdList &startdAds);