-
void launchTask(ExecutorDriver* driver, const TaskInfo& task)
-
{
-
if (run.isSome()) {
-
TaskStatus status;
-
status.mutable_task_id()->CopyFrom(task.task_id());
-
status.set_state(TASK_FAILED);
-
status.set_message(
-
“Attempted to run multiple tasks using a \”docker\” executor“);
-
-
driver->sendStatusUpdate(status);
-
return;
-
}
-
-
// Capture the TaskID.
-
taskId = task.task_id();
-
-
cout << “Starting task ” << taskId.get() << endl;
-
-
CHECK(task.has_container());
-
CHECK(task.has_command());
-
-
CHECK(task.container().type() == ContainerInfo::DOCKER);
-
-
// We’re adding task and executor resources to launch docker since
-
// the DockerContainerizer updates the container cgroup limits
-
// directly and it expects it to be the sum of both task and
-
// executor resources. This does leave to a bit of unaccounted
-
// resources for running this executor, but we are assuming
-
// this is just a very small amount of overcommit.
-
run = docker->run(
-
task.container(),
-
task.command(),
-
containerName,
-
sandboxDirectory,
-
mappedDirectory,
-
task.resources() + task.executor().resources(),
-
None(),
-
Subprocess::FD(STDOUT_FILENO),
-
Subprocess::FD(STDERR_FILENO));
-
-
run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
-
-
// Delay sending TASK_RUNNING status update until we receive
-
// inspect output.
-
inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
-
.then(defer(self(), [=](const Docker::Container& container) {
-
if (!killed) {
-
TaskStatus status;
-
status.mutable_task_id()->CopyFrom(taskId.get());
-
status.set_state(TASK_RUNNING);
-
status.set_data(container.output);
-
if (container.ipAddress.isSome()) {
-
// TODO(karya): Deprecated — Remove after 0.25.0 has shipped.
-
Label* label = status.mutable_labels()->add_labels();
-
label->set_key(“Docker.NetworkSettings.IPAddress“);
-
label->set_value(container.ipAddress.get());
-
-
NetworkInfo* networkInfo =
-
status.mutable_container_status()->add_network_infos();
-
-
// TODO(CD): Deprecated — Remove after 0.27.0.
-
networkInfo->set_ip_address(container.ipAddress.get());
-
-
NetworkInfo::IPAddress* ipAddress =
-
networkInfo->add_ip_addresses();
-
ipAddress->set_ip_address(container.ipAddress.get());
-
}
-
driver->sendStatusUpdate(status);
-
}
-
-
return Nothing();
-
}));
-
-
inspect.onReady(
-
defer(self(), &Self::launchHealthCheck, containerName, task));
-
}
评论前必须登录!
立即登录