15 #include "grpc_client.h" 16 #include "grpc_service.pb.h" 37 auto pipe = popen((
cmd +
" 2>&1").c_str(),
"r");
38 int thisErrno = errno;
41 <<
"TritonService: popen() failed with errno " << thisErrno <<
" for command: " <<
cmd;
45 std::array<char, buffSize>
buffer;
51 thisErrno = ferror(
pipe);
54 <<
"TritonService: failed reading command output with errno " << thisErrno;
58 int rv = pclose(
pipe);
59 return std::make_pair(
result, rv);
66 if (
pos != std::string::npos) {
67 auto pos2 =
pos + indicator.size();
68 auto pos3 =
output.find(
'\n', pos2);
69 return output.substr(pos2, pos3 - pos2);
76 : verbose_(
pset.getUntrackedParameter<
bool>(
"verbose")),
79 allowAddModel_(
false),
80 startedFallback_(
false),
98 if (!siteconf_address.empty() and !siteconf_port.empty()) {
100 std::piecewise_construct,
104 edm::LogInfo(
"TritonDiscovery") <<
"Obtained server from SITECONF: " 106 }
else if (siteconf_address.empty() != siteconf_port.empty()) {
107 edm::LogWarning(
"TritonDiscovery") <<
"Incomplete server information from SITECONF: HOST = " << siteconf_address
108 <<
", PORT = " << siteconf_port;
110 edm::LogWarning(
"TritonDiscovery") <<
"No server information from SITECONF";
113 for (
const auto& serverPset :
pset.getUntrackedParameterSetVector(
"servers")) {
119 <<
"TritonService: Not allowed to specify more than one server with same name (" << serverName <<
")";
125 msg =
"List of models for each server:\n";
127 std::unique_ptr<tc::InferenceServerGrpcClient>
client;
130 "TritonService(): unable to create inference context for " + serverName +
" (" +
server.url +
")",
134 inference::ServerMetadataResponse serverMetaResponse;
135 auto err =
client->ServerMetadata(&serverMetaResponse);
138 <<
", version = " << serverMetaResponse.version();
140 edm::LogInfo(
"TritonService") <<
"unable to get metadata for " + serverName +
" (" +
server.url +
")";
145 inference::RepositoryIndexResponse repoIndexResponse;
146 auto err =
client->ModelRepositoryIndex(&repoIndexResponse);
150 msg += serverName +
": ";
152 for (
const auto& modelIndex : repoIndexResponse.models()) {
153 const auto&
modelName = modelIndex.name();
157 auto& modelInfo(mit->second);
158 modelInfo.servers.insert(serverName);
165 msg +=
"unable to get repository index";
167 edm::LogWarning(
"TritonFailure") <<
"TritonService(): unable to get repository index for " + serverName +
" (" +
190 <<
"TritonService: Attempt to call addModel() outside of module constructors";
210 const auto& moduleInfo(oit->second);
213 auto& modelInfo(mit->second);
214 modelInfo.modules.erase(
id);
216 if (modelInfo.modules.empty())
227 throw cms::Exception(
"MissingModel") <<
"TritonService: There are no servers that provide model " <<
model;
228 const auto& modelInfo(mit->second);
229 const auto& modelServers = modelInfo.servers;
231 auto msit = modelServers.end();
232 if (!preferred.empty()) {
233 msit = modelServers.find(preferred);
235 if (msit == modelServers.end())
236 edm::LogWarning(
"PreferredServer") <<
"Preferred server " << preferred <<
" for model " <<
model 237 <<
" not available, will choose another server";
239 const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
255 servers_.emplace(std::piecewise_construct,
261 msg =
"List of models for fallback server: ";
321 printFallbackServerLog<edm::LogError>();
323 <<
"TritonService: Starting the fallback server failed with exit code " << rv;
329 if (chosenDevice ==
"auto") {
330 chosenDevice = extractFromLog(
output,
"CMS_TRITON_CHOSEN_DEVICE: ");
331 if (!chosenDevice.empty()) {
332 if (chosenDevice ==
"cpu")
334 else if (chosenDevice ==
"gpu")
338 <<
"TritonService: unsupported device choice " << chosenDevice <<
" for fallback server, log follows:\n" 342 <<
"TritonService: unknown device choice for fallback server, log follows:\n" 346 std::transform(chosenDevice.begin(), chosenDevice.end(), chosenDevice.begin(), toupper);
348 edm::LogInfo(
"TritonDiscovery") <<
"Fallback server started: " << chosenDevice;
351 const auto& portNum = extractFromLog(
output,
"CMS_TRITON_GRPC_PORT: ");
352 if (!portNum.empty())
353 server.url +=
":" + portNum;
356 <<
"TritonService: Unknown port for fallback server, log follows:\n" 383 printFallbackServerLog<edm::LogError>();
386 std::string stopMsg =
fmt::format(
"TritonService: Stopping the fallback server failed with exit code {}", rv);
395 printFallbackServerLog<edm::LogInfo>();
399 template <
typename LOG>
405 bool foundLog =
false;
406 for (
const auto&
logName : logNames) {
409 LOG(
"TritonService") <<
"TritonService: server log " <<
logName <<
"\n" <<
infile.rdbuf();
415 LOG(
"TritonService") <<
"TritonService: could not find server log " << logNames[0] <<
" in current directory or " 421 desc.addUntracked<
bool>(
"verbose",
false);
426 validator.addUntracked<
unsigned>(
"port");
427 validator.addUntracked<
bool>(
"useSsl",
false);
428 validator.addUntracked<
std::string>(
"rootCertificates",
"");
429 validator.addUntracked<
std::string>(
"privateKey",
"");
430 validator.addUntracked<
std::string>(
"certificateChain",
"");
432 desc.addVPSetUntracked(
"servers", validator, {});
439 edm::allowedValues<std::string>(
"apptainer",
"docker",
"podman"));
441 edm::allowedValues<std::string>(
"auto",
"cpu",
"gpu"));
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
ParameterDescriptionNode * ifValue(ParameterDescription< T > const &switchParameter, std::unique_ptr< ParameterDescriptionCases< T >> cases)
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
std::unordered_map< std::string, Model > models_
static const std::string siteconfName
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void notifyCallStatus(bool status) const
std::unordered_map< std::string, Model > unservedModels_
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
void postModuleConstruction(edm::ModuleDescription const &)
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
unsigned currentModuleId_
void preallocate(edm::service::SystemBounds const &)
Log< level::Error, false > LogError
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
static const std::string fallbackAddress
static std::string to_string(const XMLCh *ch)
void addModel(const std::string &modelName, const std::string &path)
FallbackOpts fallbackOpts_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
def unique(seq, keepstr=True)
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void printFallbackServerLog() const
def pipe(cmdline, input=None)
#define TRITON_THROW_IF_ERROR(X, MSG, NOTIFY)
void preModuleConstruction(edm::ModuleDescription const &)
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
void preModuleDestruction(edm::ModuleDescription const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::unordered_map< unsigned, Module > modules_
std::atomic< int > callFails_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
ParameterSet const & getParameterSet(ParameterSetID const &id)
Server serverInfo(const std::string &model, const std::string &preferred="") const
Log< level::Warning, false > LogWarning
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
std::unordered_map< std::string, Server > servers_
std::string getEnvironmentVariable(std::string const &name, std::string const &defaultValue=std::string())
static const std::string fallbackName