-
void launchTasks(const vector<OfferID>& offerIds,
-
const vector<TaskInfo>& tasks,
-
const Filters& filters)
-
{
-
Offer::Operation operation;
-
operation.set_type(Offer::Operation::LAUNCH);
-
-
Offer::Operation::Launch* launch = operation.mutable_launch();
-
foreach (const TaskInfo& task, tasks) {
-
launch->add_task_infos()->CopyFrom(task);
-
}
-
-
acceptOffers(offerIds, {operation}, filters);
-
}
-
-
void acceptOffers(
-
const vector<OfferID>& offerIds,
-
const vector<Offer::Operation>& operations,
-
const Filters& filters)
-
{
-
// TODO(jieyu): Move all driver side verification to master since
-
// we are moving towards supporting pure launguage scheduler.
-
-
if (!connected) {
-
VLOG(1) << "Ignoring accept offers message as master is disconnected";
-
-
// NOTE: Reply to the framework with TASK_LOST messages for each
-
// task launch. See details from notes in launchTasks.
-
foreach (const Offer::Operation& operation, operations) {
-
if (operation.type() != Offer::Operation::LAUNCH) {
-
continue;
-
}
-
-
foreach (const TaskInfo& task, operation.launch().task_infos()) {
-
StatusUpdate update = protobuf::createStatusUpdate(
-
framework.id(),
-
None(),
-
task.task_id(),
-
TASK_LOST,
-
TaskStatus::SOURCE_MASTER,
-
None(),
-
"Master disconnected",
-
TaskStatus::REASON_MASTER_DISCONNECTED);
-
-
statusUpdate(UPID(), update, UPID());
-
}
-
}
-
return;
-
}
-
-
Call call;
-
CHECK(framework.has_id());
-
call.mutable_framework_id()->CopyFrom(framework.id());
-
call.set_type(Call::ACCEPT);
-
-
Call::Accept* accept = call.mutable_accept();
-
-
// Setting accept.operations.
-
foreach (const Offer::Operation& _operation, operations) {
-
Offer::Operation* operation = accept->add_operations();
-
operation->CopyFrom(_operation);
-
}
-
-
// Setting accept.offer_ids.
-
foreach (const OfferID& offerId, offerIds) {
-
accept->add_offer_ids()->CopyFrom(offerId);
-
-
if (!savedOffers.contains(offerId)) {
-
// TODO(jieyu): A duplicated offer ID could also cause this
-
// warning being printed. Consider refine this message here
-
// and in launchTasks as well.
-
LOG(WARNING) << "Attempting to accept an unknown offer " << offerId;
-
} else {
-
// Keep only the slave PIDs where we run tasks so we can send
-
// framework messages directly.
-
foreach (const Offer::Operation& operation, operations) {
-
if (operation.type() != Offer::Operation::LAUNCH) {
-
continue;
-
}
-
-
foreach (const TaskInfo& task, operation.launch().task_infos()) {
-
const SlaveID& slaveId = task.slave_id();
-
-
if (savedOffers[offerId].contains(slaveId)) {
-
savedSlavePids[slaveId] = savedOffers[offerId][slaveId];
-
} else {
-
LOG(WARNING) << "Attempting to launch task " << task.task_id()
-
<< " with the wrong slave id " << slaveId;
-
}
-
}
-
}
-
}
-
-
// Remove the offer since we saved all the PIDs we might use.
-
savedOffers.erase(offerId);
-
}
-
-
// Setting accept.filters.
-
accept->mutable_filters()->CopyFrom(filters);
-
-
CHECK_SOME(master);
-
send(master.get().pid(), call);
-
}
还没有人抢沙发呢~