-
void Master::_accept(
-
const FrameworkID& frameworkId,
-
const SlaveID& slaveId,
-
const Resources& offeredResources,
-
const scheduler::Call::Accept& accept,
-
const Future<list<Future<bool>>>& _authorizations)
-
{
-
Framework* framework = getFramework(frameworkId);
-
……
-
-
Slave* slave = slaves.registered.get(slaveId);
-
-
if (slave == NULL || !slave->connected) {
-
foreach (const Offer::Operation& operation, accept.operations()) {
-
if (operation.type() != Offer::Operation::LAUNCH) {
-
continue;
-
}
-
-
foreach (const TaskInfo& task, operation.launch().task_infos()) {
-
const TaskStatus::Reason reason =
-
slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
-
: TaskStatus::REASON_SLAVE_DISCONNECTED;
-
const StatusUpdate& update = protobuf::createStatusUpdate(
-
framework->id(),
-
task.slave_id(),
-
task.task_id(),
-
TASK_LOST,
-
TaskStatus::SOURCE_MASTER,
-
None(),
-
slave == NULL ? “Slave removed” : “Slave disconnected“,
-
reason);
-
-
metrics->tasks_lost++;
-
-
metrics->incrementTasksStates(
-
TASK_LOST,
-
TaskStatus::SOURCE_MASTER,
-
reason);
-
-
forward(update, UPID(), framework);
-
}
-
}
-
-
// Tell the allocator about the recovered resources.
-
allocator->recoverResources(
-
frameworkId,
-
slaveId,
-
offeredResources,
-
None());
-
-
return;
-
}
-
-
// Some offer operations update the offered resources. We keep
-
// updated offered resources here. When a task is successfully
-
// launched, we remove its resource from offered resources.
-
Resources _offeredResources = offeredResources;
-
-
// The order of `authorizations` must match the order of the operations in
-
// `accept.operations()`, as they are iterated through simultaneously.
-
CHECK_READY(_authorizations);
-
list<Future<bool>> authorizations = _authorizations.get();
-
-
foreach (const Offer::Operation& operation, accept.operations()) {
-
switch (operation.type()) {
-
// The RESERVE operation allows a principal to reserve resources.
-
case Offer::Operation::RESERVE: {
-
Future<bool> authorization = authorizations.front();
-
authorizations.pop_front();
-
-
CHECK(!authorization.isDiscarded());
-
-
if (authorization.isFailed()) {
-
// TODO(greggomann): We may want to retry this failed authorization
-
// request rather than dropping it immediately.
-
drop(framework,
-
operation,
-
“Authorization of principal ‘” + framework->info.principal() +
-
“‘ to reserve resources failed: ” +
-
authorization.failure());
-
-
continue;
-
} elseif (!authorization.get()) {
-
drop(framework,
-
operation,
-
“Not authorized to reserve resources as ‘” +
-
framework->info.principal() + “‘“);
-
-
continue;
-
}
-
-
Option<string> principal = framework->info.has_principal()
-
? framework->info.principal()
-
: Option<string>::none();
-
-
// Make sure this reserve operation is valid.
-
Option<Error> error = validation::operation::validate(
-
operation.reserve(), principal);
-
-
if (error.isSome()) {
-
drop(framework, operation, error.get().message);
-
continue;
-
}
-
-
// Test the given operation on the included resources.
-
Try<Resources> resources = _offeredResources.apply(operation);
-
if (resources.isError()) {
-
drop(framework, operation, resources.error());
-
continue;
-
}
-
-
_offeredResources = resources.get();
-
-
LOG(INFO) << “Applying RESERVE operation for resources “
-
<< operation.reserve().resources() << “ from framework “
-
<< *framework << “ to slave ” << *slave;
-
-
apply(framework, slave, operation);
-
break;
-
}
-
-
// The UNRESERVE operation allows a principal to unreserve resources.
-
case Offer::Operation::UNRESERVE: {
-
Future<bool> authorization = authorizations.front();
-
authorizations.pop_front();
-
-
CHECK(!authorization.isDiscarded());
-
-
if (authorization.isFailed()) {
-
// TODO(greggomann): We may want to retry this failed authorization
-
// request rather than dropping it immediately.
-
drop(framework,
-
operation,
-
“Authorization of principal ‘” + framework->info.principal() +
-
“‘ to unreserve resources failed: ” +
-
authorization.failure());
-
-
continue;
-
} elseif (!authorization.get()) {
-
drop(framework,
-
operation,
-
“Not authorized to unreserve resources as ‘” +
-
framework->info.principal() + “‘“);
-
-
continue;
-
}
-
-
// Make sure this unreserve operation is valid.
-
Option<Error> error = validation::operation::validate(
-
operation.unreserve());
-
-
if (error.isSome()) {
-
drop(framework, operation, error.get().message);
-
continue;
-
}
-
-
// Test the given operation on the included resources.
-
Try<Resources> resources = _offeredResources.apply(operation);
-
if (resources.isError()) {
-
drop(framework, operation, resources.error());
-
continue;
-
}
-
-
_offeredResources = resources.get();
-
-
LOG(INFO) << “Applying UNRESERVE operation for resources “
-
<< operation.unreserve().resources() << “ from framework “
-
<< *framework << “ to slave ” << *slave;
-
-
apply(framework, slave, operation);
-
break;
-
}
-
-
case Offer::Operation::CREATE: {
-
Future<bool> authorization = authorizations.front();
-
authorizations.pop_front();
-
-
CHECK(!authorization.isDiscarded());
-
-
if (authorization.isFailed()) {
-
// TODO(greggomann): We may want to retry this failed authorization
-
// request rather than dropping it immediately.
-
drop(framework,
-
operation,
-
“Authorization of principal ‘” + framework->info.principal() +
-
“‘ to create persistent volumes failed: ” +
-
authorization.failure());
-
-
continue;
-
} elseif (!authorization.get()) {
-
drop(framework,
-
operation,
-
“Not authorized to create persistent volumes as ‘” +
-
framework->info.principal() + “‘“);
-
-
continue;
-
}
-
-
// Make sure this create operation is valid.
-
Option<Error> error = validation::operation::validate(
-
operation.create(), slave->checkpointedResources);
-
-
if (error.isSome()) {
-
drop(framework, operation, error.get().message);
-
continue;
-
}
-
-
Try<Resources> resources = _offeredResources.apply(operation);
-
if (resources.isError()) {
-
drop(framework, operation, resources.error());
-
continue;
-
}
-
-
_offeredResources = resources.get();
-
-
LOG(INFO) << “Applying CREATE operation for volumes “
-
<< operation.create().volumes() << “ from framework “
-
<< *framework << “ to slave ” << *slave;
-
-
apply(framework, slave, operation);
-
break;
-
}
-
-
case Offer::Operation::DESTROY: {
-
Future<bool> authorization = authorizations.front();
-
authorizations.pop_front();
-
-
CHECK(!authorization.isDiscarded());
-
-
if (authorization.isFailed()) {
-
// TODO(greggomann): We may want to retry this failed authorization
-
// request rather than dropping it immediately.
-
drop(framework,
-
operation,
-
“Authorization of principal ‘” + framework->info.principal() +
-
“‘ to destroy persistent volumes failed: ” +
-
authorization.failure());
-
-
continue;
-
} elseif (!authorization.get()) {
-
drop(framework,
-
operation,
-
“Not authorized to destroy persistent volumes as ‘” +
-
framework->info.principal() + “‘“);
-
-
continue;
-
}
-
-
// Make sure this destroy operation is valid.
-
Option<Error> error = validation::operation::validate(
-
operation.destroy(), slave->checkpointedResources);
-
-
if (error.isSome()) {
-
drop(framework, operation, error.get().message);
-
continue;
-
}
-
-
Try<Resources> resources = _offeredResources.apply(operation);
-
if (resources.isError()) {
-
drop(framework, operation, resources.error());
-
continue;
-
}
-
-
_offeredResources = resources.get();
-
-
LOG(INFO) << “Applying DESTROY operation for volumes “
-
<< operation.create().volumes() << “ from framework “
-
<< *framework << “ to slave ” << *slave;
-
-
apply(framework, slave, operation);
-
break;
-
}
-
-
case Offer::Operation::LAUNCH: {
-
foreach (const TaskInfo& task, operation.launch().task_infos()) {
-
Future<bool> authorization = authorizations.front();
-
authorizations.pop_front();
-
-
// NOTE: The task will not be in ‘pendingTasks’ if
-
// ‘killTask()’ for the task was called before we are here.
-
// No need to launch the task if it’s no longer pending.
-
// However, we still need to check the authorization result
-
// and do the validation so that we can send status update
-
// in case the task has duplicated ID.
-
bool pending = framework->pendingTasks.contains(task.task_id());
-
-
// Remove from pending tasks.
-
framework->pendingTasks.erase(task.task_id());
-
-
CHECK(!authorization.isDiscarded());
-
-
if (authorization.isFailed() || !authorization.get()) {
-
string user = framework->info.user(); // Default user.
-
if (task.has_command() && task.command().has_user()) {
-
user = task.command().user();
-
} elseif (task.has_executor() &&
-
task.executor().command().has_user()) {
-
user = task.executor().command().user();
-
}
-
-
const StatusUpdate& update = protobuf::createStatusUpdate(
-
framework->id(),
-
task.slave_id(),
-
task.task_id(),
-
TASK_ERROR,
-
TaskStatus::SOURCE_MASTER,
-
None(),
-
authorization.isFailed() ?
-
“Authorization failure: ” + authorization.failure() :
-
“Not authorized to launch as user ‘” + user + “‘“,
-
TaskStatus::REASON_TASK_UNAUTHORIZED);
-
-
metrics->tasks_error++;
-
-
metrics->incrementTasksStates(
-
TASK_ERROR,
-
TaskStatus::SOURCE_MASTER,
-
TaskStatus::REASON_TASK_UNAUTHORIZED);
-
-
forward(update, UPID(), framework);
-
-
continue;
-
}
-
-
// Validate the task.
-
-
// Make a copy of the original task so that we can
-
// fill the missing `framework_id` in ExecutorInfo
-
// if needed. This field was added to the API later
-
// and thus was made optional.
-
TaskInfo task_(task);
-
if (task.has_executor() && !task.executor().has_framework_id()) {
-
task_.mutable_executor()
-
->mutable_framework_id()->CopyFrom(framework->id());
-
}
-
-
const Option<Error>& validationError = validation::task::validate(
-
task_,
-
framework,
-
slave,
-
_offeredResources);
-
-
if (validationError.isSome()) {
-
const StatusUpdate& update = protobuf::createStatusUpdate(
-
framework->id(),
-
task_.slave_id(),
-
task_.task_id(),
-
TASK_ERROR,
-
TaskStatus::SOURCE_MASTER,
-
None(),
-
validationError.get().message,
-
TaskStatus::REASON_TASK_INVALID);
-
-
metrics->tasks_error++;
-
-
metrics->incrementTasksStates(
-
TASK_ERROR,
-
TaskStatus::SOURCE_MASTER,
-
TaskStatus::REASON_TASK_INVALID);
-
-
forward(update, UPID(), framework);
-
-
continue;
-
}
-
-
// Add task.
-
if (pending) {
-
_offeredResources -= addTask(task_, framework, slave);
-
-
// TODO(bmahler): Consider updating this log message to
-
// indicate when the executor is also being launched.
-
LOG(INFO) << “Launching task ” << task_.task_id()
-
<< “ of framework ” << *framework
-
<< “ with resources ” << task_.resources()
-
<< “ on slave ” << *slave;
-
-
RunTaskMessage message;
-
message.mutable_framework()->MergeFrom(framework->info);
-
-
// TODO(anand): We set ‘pid’ to UPID() for http frameworks
-
// as ‘pid’ was made optional in 0.24.0. In 0.25.0, we
-
// no longer have to set pid here for http frameworks.
-
message.set_pid(framework->pid.getOrElse(UPID()));
-
message.mutable_task()->MergeFrom(task_);
-
-
if (HookManager::hooksAvailable()) {
-
// Set labels retrieved from label-decorator hooks.
-
message.mutable_task()->mutable_labels()->CopyFrom(
-
HookManager::masterLaunchTaskLabelDecorator(
-
task_,
-
framework->info,
-
slave->info));
-
}
-
-
send(slave->pid, message);
-
}
-
}
-
break;
-
}
-
-
default:
-
LOG(ERROR) << “Unsupported offer operation ” << operation.type();
-
break;
-
}
-
}
-
-
if (!_offeredResources.empty()) {
-
// Tell the allocator about the unused (e.g., refused) resources.
-
allocator->recoverResources(
-
frameworkId,
-
slaveId,
-
_offeredResources,
-
accept.filters());
-
}
-
}
评论前必须登录!
立即登录