12 #include "grpc_client.h"
13 #include "grpc_service.pb.h"
30 auto pipe = popen((cmd +
" 2>&1").c_str(),
"r");
31 int thisErrno = errno;
33 throw cms::Exception(
"SystemError") <<
"popen() failed with errno " << thisErrno <<
" for command: " <<
cmd;
36 constexpr
static unsigned buffSize = 128;
37 std::array<char, buffSize>
buffer;
40 if (fgets(buffer.data(), buffSize,
pipe))
41 result += buffer.data();
43 thisErrno = ferror(
pipe);
45 throw cms::Exception(
"SystemError") <<
"failed reading command output with errno " << thisErrno;
49 int rv = pclose(
pipe);
50 return std::make_pair(result, rv);
55 : verbose_(pset.getUntrackedParameter<bool>(
"verbose")),
58 allowAddModel_(
false),
59 startedFallback_(
false),
60 pid_(std::to_string(::getpid())) {
73 #ifdef TRITON_ENABLE_GPU
78 servers_.emplace(std::piecewise_construct,
86 msg =
"List of models for each server:\n";
93 <<
"Not allowed to specify more than one server with same name (" << serverName <<
")";
96 std::unique_ptr<tc::InferenceServerGrpcClient>
client;
98 tc::InferenceServerGrpcClient::Create(&client,
server.url,
false,
server.useSsl,
server.sslOptions),
99 "TritonService(): unable to create inference context for " + serverName +
" (" +
server.url +
")");
102 inference::ServerMetadataResponse serverMetaResponse;
104 "TritonService(): unable to get metadata for " + serverName +
" (" +
server.url +
")");
106 <<
", version = " << serverMetaResponse.version();
109 inference::RepositoryIndexResponse repoIndexResponse;
111 client->ModelRepositoryIndex(&repoIndexResponse),
112 "TritonService(): unable to get repository index for " + serverName +
" (" +
server.url +
")");
116 msg += serverName +
": ";
117 for (
const auto& modelIndex : repoIndexResponse.models()) {
118 const auto& modelName = modelIndex.name();
119 auto mit =
models_.find(modelName);
121 mit =
models_.emplace(modelName,
"").first;
122 auto& modelInfo(mit->second);
123 modelInfo.servers.insert(serverName);
124 server.models.insert(modelName);
126 msg += modelName +
", ";
143 throw cms::Exception(
"DisallowedAddModel") <<
"Attempt to call addModel() outside of module constructors";
145 auto mit =
models_.find(modelName);
147 auto& modelInfo(
unservedModels_.emplace(modelName, path).first->second);
163 const auto& moduleInfo(oit->second);
166 auto& modelInfo(mit->second);
167 modelInfo.modules.erase(
id);
169 if (modelInfo.modules.empty())
178 auto mit =
models_.find(model);
180 throw cms::Exception(
"MissingModel") <<
"There are no servers that provide model " <<
model;
181 const auto& modelInfo(mit->second);
182 const auto& modelServers = modelInfo.servers;
184 auto msit = modelServers.end();
185 if (!preferred.empty()) {
186 msit = modelServers.find(preferred);
188 if (msit == modelServers.end())
189 edm::LogWarning(
"PreferredServer") <<
"Preferred server " << preferred <<
" for model " << model
190 <<
" not available, will choose another server";
192 const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
206 msg =
"List of models for fallback server: ";
210 auto& modelInfo(
models_.emplace(modelName,
model).first->second);
212 server.models.insert(modelName);
214 msg += modelName +
", ";
235 for (
const auto& [modelName,
model] : unservedModels_) {
236 command +=
" -m " +
model.path;
243 unservedModels_.clear();
263 const auto& [
output, rv] = execSys(command);
267 throw cms::Exception(
"FallbackFailed") <<
"Starting the fallback server failed with exit code " << rv;
270 const std::string& portIndicator(
"CMS_TRITON_GRPC_PORT: ");
272 auto pos =
output.rfind(portIndicator);
273 if (pos != std::string::npos) {
274 auto pos2 = pos + portIndicator.size();
275 auto pos3 =
output.find(
'\n', pos2);
276 const auto& portNum =
output.substr(pos2, pos3 - pos2);
277 server.url +=
":" + portNum;
279 throw cms::Exception(
"FallbackFailed") <<
"Unknown port for fallback server, log follows:\n" <<
output;
289 validator.addUntracked<
unsigned>(
"port");
290 validator.addUntracked<
bool>(
"useSsl",
false);
291 validator.addUntracked<
std::string>(
"rootCertificates",
"");
292 validator.addUntracked<
std::string>(
"privateKey",
"");
293 validator.addUntracked<
std::string>(
"certificateChain",
"");
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
std::unordered_map< std::string, Model > models_
Server serverInfo(const std::string &model, const std::string &preferred="") const
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
std::unordered_map< std::string, Model > unservedModels_
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
ParameterSet const & getParameterSet(ParameterSetID const &id)
void postModuleConstruction(edm::ModuleDescription const &)
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
unsigned currentModuleId_
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
static const std::string fallbackAddress
void throwIfError(const Error &err, std::string_view msg)
void addModel(const std::string &modelName, const std::string &path)
FallbackOpts fallbackOpts_
if(conf_.getParameter< bool >("UseStripCablingDB"))
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
ParameterDescriptionBase * add(U const &iLabel, T const &value)
void preModuleConstruction(edm::ModuleDescription const &)
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
void preModuleDestruction(edm::ModuleDescription const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::unordered_map< unsigned, Module > modules_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
VParameterSet getUntrackedParameterSetVector(std::string const &name, VParameterSet const &defaultValue) const
ParameterDescriptionBase * addVPSetUntracked(U const &iLabel, ParameterSetDescription const &validator, std::vector< ParameterSet > const &defaults)
std::unordered_map< std::string, Server > servers_
static const std::string fallbackName