diff --git a/src/condor_daemon_client/dc_startd.cpp b/src/condor_daemon_client/dc_startd.cpp
index 7261c4a..09a2689 100644
--- a/src/condor_daemon_client/dc_startd.cpp
+++ b/src/condor_daemon_client/dc_startd.cpp
@@ -51,7 +51,7 @@ DCStartd::DCStartd( const char* tName, const char* tPool, const char* tAddr,
}
}
-DCStartd::DCStartd( ClassAd *ad, const char *tPool )
+DCStartd::DCStartd( const ClassAd *ad, const char *tPool )
: Daemon(ad,DT_STARTD,tPool),
claim_id(NULL)
{
diff --git a/src/condor_daemon_client/dc_startd.h b/src/condor_daemon_client/dc_startd.h
index c5f3e89..ff20892 100644
--- a/src/condor_daemon_client/dc_startd.h
+++ b/src/condor_daemon_client/dc_startd.h
@@ -49,7 +49,7 @@ public:
DCStartd( const char* const name, const char* const pool,
const char* const addr, const char* const id );
- DCStartd( ClassAd *ad, const char *pool = NULL );
+ DCStartd( const ClassAd *ad, const char *pool = NULL );
/// Destructor.
~DCStartd();
diff --git a/src/defrag/defrag.cpp b/src/defrag/defrag.cpp
index 26aec0a..8710b5d 100644
--- a/src/defrag/defrag.cpp
+++ b/src/defrag/defrag.cpp
@@ -185,6 +185,8 @@ void Defrag::config()
}
}
+ m_can_cancel = param_boolean("DEFRAG_CAN_CANCEL", true);
+
param(m_defrag_name,"DEFRAG_NAME");
int stats_quantum = m_polling_interval;
@@ -487,8 +489,17 @@ void Defrag::poll()
int num_whole_machines = countMachines(m_whole_machine_expr.c_str(),"DEFRAG_WHOLE_MACHINE_EXPR",&whole_machines);
m_stats.WholeMachines = num_whole_machines;
+ MachineSet draining_whole_machines;
+ std::stringstream draining_whole_machines_ss;
+ draining_whole_machines_ss << m_whole_machine_expr << " && Draining && Offline=!=True";
+ int num_draining_whole_machines = countMachines(draining_whole_machines_ss.str().c_str(),
+ "<DEFRAG_WHOLE_MACHINE_EXPR Draining>", &draining_whole_machines);
+
dprintf(D_ALWAYS,"There are currently %d draining and %d whole machines.\n",
num_draining,num_whole_machines);
+ if (num_draining_whole_machines)
+ dprintf(D_ALWAYS, "Of the %d whole machines, %d are in the draining state.\n",
+ num_whole_machines, num_draining_whole_machines);
queryDrainingCost();
@@ -548,8 +559,7 @@ void Defrag::poll()
ClassAdList startdAds;
std::string requirements;
- sprintf(requirements,"(%s) && Draining =!= true",m_defrag_requirements.c_str());
- if( !queryMachines(requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) {
+ if( !queryMachines(m_defrag_requirements.c_str(),"DEFRAG_REQUIREMENTS",startdAds) ) {
dprintf(D_ALWAYS,"Doing nothing, because the query to select machines matching DEFRAG_REQUIREMENTS failed.\n");
return;
}
@@ -561,12 +571,26 @@ void Defrag::poll()
int num_drained = 0;
ClassAd *startd_ad;
MachineSet machines_done;
+ MachineSet draining_machines_done;
while( (startd_ad=startdAds.Next()) ) {
std::string machine;
std::string name;
startd_ad->LookupString(ATTR_NAME,name);
slotNameToDaemonName(name,machine);
+ if( !draining_machines_done.count(machine) && draining_whole_machines.count(machine) ) {
+ cancel_drain(*startd_ad);
+ draining_machines_done.insert(machine);
+ continue;
+ }
+
+ // Do not consider slots which are already draining.
+ bool startd_currently_draining = false;
+ startd_ad->LookupBool("Draining", startd_currently_draining);
+ if( startd_currently_draining ) {
+ continue;
+ }
+
if( machines_done.count(machine) ) {
dprintf(D_FULLDEBUG,
"Skipping %s: already attempted to drain %s in this cycle.\n",
@@ -581,14 +605,13 @@ void Defrag::poll()
continue;
}
- if( drain(startd_ad) ) {
+ if( (num_drained++ < num_to_drain) && drain(*startd_ad) ) {
machines_done.insert(machine);
- if( ++num_drained >= num_to_drain ) {
+ if( num_drained >= num_to_drain ) {
dprintf(D_ALWAYS,
"Drained maximum number of machines allowed in this cycle (%d).\n",
num_to_drain);
- break;
}
}
}
@@ -601,26 +624,24 @@ void Defrag::poll()
}
bool
-Defrag::drain(ClassAd *startd_ad)
+Defrag::drain(const ClassAd &startd_ad)
{
- ASSERT( startd_ad );
-
std::string name;
- startd_ad->LookupString(ATTR_NAME,name);
+ startd_ad.LookupString(ATTR_NAME,name);
dprintf(D_ALWAYS,"Initiating %s draining of %s.\n",
m_draining_schedule_str.c_str(),name.c_str());
- DCStartd startd( startd_ad );
+ DCStartd startd( &startd_ad );
int graceful_completion = 0;
- startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion);
+ startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_COMPLETION,graceful_completion);
int quick_completion = 0;
- startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion);
+ startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_COMPLETION,quick_completion);
int graceful_badput = 0;
- startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput);
+ startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_GRACEFUL_DRAINING_BADPUT,graceful_badput);
int quick_badput = 0;
- startd_ad->LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput);
+ startd_ad.LookupInteger(ATTR_EXPECTED_MACHINE_QUICK_DRAINING_BADPUT,quick_badput);
time_t now = time(NULL);
std::string draining_check_expr;
@@ -659,6 +680,27 @@ Defrag::drain(ClassAd *startd_ad)
return true;
}
+bool
+Defrag::cancel_drain(const ClassAd &startd_ad)
+{
+
+ std::string name;
+ startd_ad.LookupString(ATTR_NAME,name);
+
+ dprintf(D_ALWAYS,"Initiating %s draining of %s.\n",
+ m_draining_schedule_str.c_str(),name.c_str());
+
+ DCStartd startd( &startd_ad );
+
+ bool rval = startd.cancelDrainJobs( NULL );
+ if ( rval ) {
+ dprintf(D_FULLDEBUG, "Sent request to cancel draining on %s\n", startd.name());
+ } else {
+ dprintf(D_ALWAYS, "Unable to cancel draining on %s: %s\n", startd.name(), startd.error());
+ }
+ return rval;
+}
+
void
Defrag::publish(ClassAd *ad)
{
diff --git a/src/defrag/defrag.h b/src/defrag/defrag.h
index 8c7fd51..909b569 100644
--- a/src/defrag/defrag.h
+++ b/src/defrag/defrag.h
@@ -40,11 +40,11 @@ class Defrag: public Service {
void stop();
void poll(); // do the periodic policy evaluation
- bool drain(ClassAd *startd_ad);
typedef std::set< std::string > MachineSet;
private:
+
int m_polling_interval; // delay between evaluations of the policy
int m_polling_timer;
double m_draining_per_hour;
@@ -58,6 +58,7 @@ class Defrag: public Service {
ClassAd m_rank_ad;
int m_draining_schedule;
std::string m_draining_schedule_str;
+ bool m_can_cancel; // Whether condor_defrag can also cancel draining early.
time_t m_last_poll;
@@ -70,6 +71,9 @@ class Defrag: public Service {
ClassAd m_public_ad;
DefragStats m_stats;
+ bool drain(const ClassAd &startd_ad);
+ bool cancel_drain(const ClassAd &startd_ad);
+
void validateExpr(char const *constraint,char const *constraint_source);
bool queryMachines(char const *constraint,char const *constraint_source,ClassAdList &startdAds);