CMS 3D CMS Logo

TritonService.cc
Go to the documentation of this file.
3 
12 
13 #include "grpc_client.h"
14 #include "grpc_service.pb.h"
15 
16 #include <cstdio>
17 #include <cstdlib>
18 #include <filesystem>
19 #include <utility>
20 #include <tuple>
21 #include <unistd.h>
22 
23 namespace tc = triton::client;
24 
27 
28 namespace {
29  std::pair<std::string, int> execSys(const std::string& cmd) {
30  //redirect stderr to stdout
31  auto pipe = popen((cmd + " 2>&1").c_str(), "r");
32  int thisErrno = errno;
33  if (!pipe)
34  throw cms::Exception("SystemError") << "popen() failed with errno " << thisErrno << " for command: " << cmd;
35 
36  //extract output
37  constexpr static unsigned buffSize = 128;
38  std::array<char, buffSize> buffer;
40  while (!feof(pipe)) {
41  if (fgets(buffer.data(), buffSize, pipe))
42  result += buffer.data();
43  else {
44  thisErrno = ferror(pipe);
45  if (thisErrno)
46  throw cms::Exception("SystemError") << "failed reading command output with errno " << thisErrno;
47  }
48  }
49 
50  int rv = pclose(pipe);
51  return std::make_pair(result, rv);
52  }
53 } // namespace
54 
56  : verbose_(pset.getUntrackedParameter<bool>("verbose")),
57  fallbackOpts_(pset.getParameterSet("fallback")),
58  currentModuleId_(0),
59  allowAddModel_(false),
60  startedFallback_(false),
61  pid_(std::to_string(::getpid())) {
62  //module construction is assumed to be serial (correct at the time this code was written)
66  //fallback server will be launched (if needed) before beginJob
68 
69  //include fallback server in set if enabled
70  if (fallbackOpts_.enable) {
71  auto serverType = TritonServerType::Remote;
72  if (!fallbackOpts_.useGPU)
73  serverType = TritonServerType::LocalCPU;
74 #ifdef TRITON_ENABLE_GPU
75  else
76  serverType = TritonServerType::LocalGPU;
77 #endif
78 
79  servers_.emplace(std::piecewise_construct,
80  std::forward_as_tuple(Server::fallbackName),
81  std::forward_as_tuple(Server::fallbackName, Server::fallbackAddress, serverType));
82  }
83 
84  //loop over input servers: check which models they have
86  if (verbose_)
87  msg = "List of models for each server:\n";
88  for (const auto& serverPset : pset.getUntrackedParameterSetVector("servers")) {
89  const std::string& serverName(serverPset.getUntrackedParameter<std::string>("name"));
90  //ensure uniqueness
91  auto [sit, unique] = servers_.emplace(serverName, serverPset);
92  if (!unique)
93  throw cms::Exception("DuplicateServer")
94  << "Not allowed to specify more than one server with same name (" << serverName << ")";
95  auto& server(sit->second);
96 
97  std::unique_ptr<tc::InferenceServerGrpcClient> client;
99  tc::InferenceServerGrpcClient::Create(&client, server.url, false, server.useSsl, server.sslOptions),
100  "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")");
101 
102  if (verbose_) {
103  inference::ServerMetadataResponse serverMetaResponse;
104  triton_utils::throwIfError(client->ServerMetadata(&serverMetaResponse),
105  "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")");
106  edm::LogInfo("TritonService") << "Server " << serverName << ": url = " << server.url
107  << ", version = " << serverMetaResponse.version();
108  }
109 
110  inference::RepositoryIndexResponse repoIndexResponse;
112  client->ModelRepositoryIndex(&repoIndexResponse),
113  "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")");
114 
115  //servers keep track of models and vice versa
116  if (verbose_)
117  msg += serverName + ": ";
118  for (const auto& modelIndex : repoIndexResponse.models()) {
119  const auto& modelName = modelIndex.name();
120  auto mit = models_.find(modelName);
121  if (mit == models_.end())
122  mit = models_.emplace(modelName, "").first;
123  auto& modelInfo(mit->second);
124  modelInfo.servers.insert(serverName);
125  server.models.insert(modelName);
126  if (verbose_)
127  msg += modelName + ", ";
128  }
129  if (verbose_)
130  msg += "\n";
131  }
132  if (verbose_)
133  edm::LogInfo("TritonService") << msg;
134 }
135 
137  currentModuleId_ = desc.id();
138  allowAddModel_ = true;
139 }
140 
142  //should only be called in module constructors
143  if (!allowAddModel_)
144  throw cms::Exception("DisallowedAddModel") << "Attempt to call addModel() outside of module constructors";
145  //if model is not in the list, then no specified server provides it
146  auto mit = models_.find(modelName);
147  if (mit == models_.end()) {
148  auto& modelInfo(unservedModels_.emplace(modelName, path).first->second);
149  modelInfo.modules.insert(currentModuleId_);
150  //only keep track of modules that need unserved models
152  }
153 }
154 
156 
158  //remove destructed modules from unserved list
159  if (unservedModels_.empty())
160  return;
161  auto id = desc.id();
162  auto oit = modules_.find(id);
163  if (oit != modules_.end()) {
164  const auto& moduleInfo(oit->second);
165  auto mit = unservedModels_.find(moduleInfo.model);
166  if (mit != unservedModels_.end()) {
167  auto& modelInfo(mit->second);
168  modelInfo.modules.erase(id);
169  //remove a model if it is no longer needed by any modules
170  if (modelInfo.modules.empty())
171  unservedModels_.erase(mit);
172  }
173  modules_.erase(oit);
174  }
175 }
176 
177 //second return value is only true if fallback CPU server is being used
179  auto mit = models_.find(model);
180  if (mit == models_.end())
181  throw cms::Exception("MissingModel") << "There are no servers that provide model " << model;
182  const auto& modelInfo(mit->second);
183  const auto& modelServers = modelInfo.servers;
184 
185  auto msit = modelServers.end();
186  if (!preferred.empty()) {
187  msit = modelServers.find(preferred);
188  //todo: add a "strict" parameter to stop execution if preferred server isn't found?
189  if (msit == modelServers.end())
190  edm::LogWarning("PreferredServer") << "Preferred server " << preferred << " for model " << model
191  << " not available, will choose another server";
192  }
193  const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
194 
195  //todo: use some algorithm to select server rather than just picking arbitrarily
196  const auto& server(servers_.find(serverName)->second);
197  return server;
198 }
199 
201  //only need fallback if there are unserved models
202  if (!fallbackOpts_.enable or unservedModels_.empty())
203  return;
204 
206  if (verbose_)
207  msg = "List of models for fallback server: ";
208  //all unserved models are provided by fallback server
209  auto& server(servers_.find(Server::fallbackName)->second);
210  for (const auto& [modelName, model] : unservedModels_) {
211  auto& modelInfo(models_.emplace(modelName, model).first->second);
212  modelInfo.servers.insert(Server::fallbackName);
213  server.models.insert(modelName);
214  if (verbose_)
215  msg += modelName + ", ";
216  }
217  if (verbose_)
218  edm::LogInfo("TritonService") << msg;
219 
220  //randomize instance name
221  if (fallbackOpts_.instanceName.empty()) {
222  fallbackOpts_.instanceName = "triton_server_instance_" + edm::createGlobalIdentifier();
223  }
224 
225  //assemble server start command
226  std::string command("cmsTriton -P -1 -p " + pid_);
227  if (fallbackOpts_.debug)
228  command += " -c";
230  command += " -v";
232  command += " -d";
233  if (fallbackOpts_.useGPU)
234  command += " -g";
235  if (!fallbackOpts_.instanceName.empty())
236  command += " -n " + fallbackOpts_.instanceName;
237  if (fallbackOpts_.retries >= 0)
238  command += " -r " + std::to_string(fallbackOpts_.retries);
239  if (fallbackOpts_.wait >= 0)
240  command += " -w " + std::to_string(fallbackOpts_.wait);
241  for (const auto& [modelName, model] : unservedModels_) {
242  command += " -m " + model.path;
243  }
244  if (!fallbackOpts_.imageName.empty())
245  command += " -i " + fallbackOpts_.imageName;
246  if (!fallbackOpts_.sandboxName.empty())
247  command += " -s " + fallbackOpts_.sandboxName;
248  //don't need this anymore
249  unservedModels_.clear();
250 
251  //get a random temporary directory if none specified
252  if (fallbackOpts_.tempDir.empty()) {
253  auto tmp_dir_path{std::filesystem::temp_directory_path() /= edm::createGlobalIdentifier()};
254  fallbackOpts_.tempDir = tmp_dir_path.string();
255  }
256  //special case ".": use script default (temp dir = .$instanceName)
257  if (fallbackOpts_.tempDir != ".")
258  command += " -t " + fallbackOpts_.tempDir;
259 
260  command += " start";
261 
262  if (fallbackOpts_.debug)
263  edm::LogInfo("TritonService") << "Fallback server temporary directory: " << fallbackOpts_.tempDir;
264  if (verbose_)
265  edm::LogInfo("TritonService") << command;
266 
267  //mark as started before executing in case of ctrl+c while command is running
268  startedFallback_ = true;
269  const auto& [output, rv] = execSys(command);
270  if (verbose_ or rv != 0)
271  edm::LogInfo("TritonService") << output;
272  if (rv != 0)
273  throw cms::Exception("FallbackFailed") << "Starting the fallback server failed with exit code " << rv;
274 
275  //get the port
276  const std::string& portIndicator("CMS_TRITON_GRPC_PORT: ");
277  //find last instance in log in case multiple ports were tried
278  auto pos = output.rfind(portIndicator);
279  if (pos != std::string::npos) {
280  auto pos2 = pos + portIndicator.size();
281  auto pos3 = output.find('\n', pos2);
282  const auto& portNum = output.substr(pos2, pos3 - pos2);
283  server.url += ":" + portNum;
284  } else
285  throw cms::Exception("FallbackFailed") << "Unknown port for fallback server, log follows:\n" << output;
286 }
287 
290  desc.addUntracked<bool>("verbose", false);
291 
293  validator.addUntracked<std::string>("name");
294  validator.addUntracked<std::string>("address");
295  validator.addUntracked<unsigned>("port");
296  validator.addUntracked<bool>("useSsl", false);
297  validator.addUntracked<std::string>("rootCertificates", "");
298  validator.addUntracked<std::string>("privateKey", "");
299  validator.addUntracked<std::string>("certificateChain", "");
300 
301  desc.addVPSetUntracked("servers", validator, {});
302 
303  edm::ParameterSetDescription fallbackDesc;
304  fallbackDesc.addUntracked<bool>("enable", false);
305  fallbackDesc.addUntracked<bool>("debug", false);
306  fallbackDesc.addUntracked<bool>("verbose", false);
307  fallbackDesc.addUntracked<bool>("useDocker", false);
308  fallbackDesc.addUntracked<bool>("useGPU", false);
309  fallbackDesc.addUntracked<int>("retries", -1);
310  fallbackDesc.addUntracked<int>("wait", -1);
311  fallbackDesc.addUntracked<std::string>("instanceName", "");
312  fallbackDesc.addUntracked<std::string>("tempDir", "");
313  fallbackDesc.addUntracked<std::string>("imageName", "");
314  fallbackDesc.addUntracked<std::string>("sandboxName", "");
315  desc.add<edm::ParameterSetDescription>("fallback", fallbackDesc);
316 
317  descriptions.addWithDefaultLabel(desc);
318 }
TritonService::startedFallback_
bool startedFallback_
Definition: TritonService.h:117
ConfigurationDescriptions.h
mps_setup.cmd
list cmd
Definition: mps_setup.py:244
TritonService::Server
Definition: TritonService.h:55
edm::ActivityRegistry::watchPreModuleDestruction
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:731
electrons_cff.bool
bool
Definition: electrons_cff.py:366
MessageLogger.h
funct::false
false
Definition: Factorize.h:29
TritonService::FallbackOpts::verbose
bool verbose
Definition: TritonService.h:45
TritonService::fallbackOpts_
FallbackOpts fallbackOpts_
Definition: TritonService.h:114
TritonService::allowAddModel_
bool allowAddModel_
Definition: TritonService.h:116
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
edm::ProcessContext
Definition: ProcessContext.h:27
TritonService::Server::fallbackAddress
static const std::string fallbackAddress
Definition: TritonService.h:79
TritonService::FallbackOpts::imageName
std::string imageName
Definition: TritonService.h:52
pos
Definition: PixelAliasList.h:18
TritonService::models_
std::unordered_map< std::string, Model > models_
Definition: TritonService.h:122
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
TritonService::currentModuleId_
unsigned currentModuleId_
Definition: TritonService.h:115
if
if(0==first)
Definition: CAHitNtupletGeneratorKernelsImpl.h:58
TritonService::modules_
std::unordered_map< unsigned, Module > modules_
Definition: TritonService.h:123
mps_check.msg
tuple msg
Definition: mps_check.py:285
TritonService::preModuleDestruction
void preModuleDestruction(edm::ModuleDescription const &)
Definition: TritonService.cc:157
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
TritonService::FallbackOpts::debug
bool debug
Definition: TritonService.h:44
mps_check.command
list command
Definition: mps_check.py:25
ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.model
model
Definition: ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.py:11
edm::ModuleDescription
Definition: ModuleDescription.h:21
edmScanValgrind.buffer
buffer
Definition: edmScanValgrind.py:171
ModuleDescription.h
ActivityRegistry.h
TritonService::preModuleConstruction
void preModuleConstruction(edm::ModuleDescription const &)
Definition: TritonService.cc:136
HLTEgPhaseIITestSequence_cff.modelName
modelName
Definition: HLTEgPhaseIITestSequence_cff.py:16
triton_utils::throwIfError
void throwIfError(const Error &err, std::string_view msg)
Definition: triton_utils.cc:21
TritonService::unservedModels_
std::unordered_map< std::string, Model > unservedModels_
Definition: TritonService.h:119
HLTObjectMonitor_Client_cff.client
client
Definition: HLTObjectMonitor_Client_cff.py:6
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
TritonService::Server::fallbackName
static const std::string fallbackName
Definition: TritonService.h:78
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
ParameterSetDescription.h
edm::ActivityRegistry::watchPostModuleConstruction
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:722
TritonService.h
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
pipe
Definition: pipe.py:1
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
TritonServerType::LocalCPU
edm::ParameterSet
Definition: ParameterSet.h:47
contentValuesFiles.server
server
Definition: contentValuesFiles.py:37
TritonService::addModel
void addModel(const std::string &modelName, const std::string &path)
Definition: TritonService.cc:141
TritonService::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: TritonService.cc:200
TritonService::FallbackOpts::instanceName
std::string instanceName
Definition: TritonService.h:50
GlobalIdentifier.h
edm::ActivityRegistry::watchPreModuleConstruction
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:713
edm::createGlobalIdentifier
std::string createGlobalIdentifier(bool binary=false)
Definition: GlobalIdentifier.cc:5
TritonService::serverInfo
Server serverInfo(const std::string &model, const std::string &preferred="") const
Definition: TritonService.cc:178
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
TritonService::TritonService
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
Definition: TritonService.cc:55
TritonService::FallbackOpts::wait
int wait
Definition: TritonService.h:49
TritonService::FallbackOpts::useGPU
bool useGPU
Definition: TritonService.h:47
TritonService::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TritonService.cc:288
TritonServerType::LocalGPU
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
std
Definition: JetResolutionObject.h:76
TritonService::pid_
std::string pid_
Definition: TritonService.h:118
tier0.unique
def unique(seq, keepstr=True)
Definition: tier0.py:24
triton_utils.h
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
TritonServerType::Remote
Exception
Definition: hltDiff.cc:245
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Exception.h
TritonService::postModuleConstruction
void postModuleConstruction(edm::ModuleDescription const &)
Definition: TritonService.cc:155
TritonService::FallbackOpts::useDocker
bool useDocker
Definition: TritonService.h:46
mps_fire.result
result
Definition: mps_fire.py:311
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
TritonService::FallbackOpts::enable
bool enable
Definition: TritonService.h:43
pipe.pipe
def pipe(cmdline, input=None)
Definition: pipe.py:5
TritonService::FallbackOpts::sandboxName
std::string sandboxName
Definition: TritonService.h:53
TritonService::FallbackOpts::tempDir
std::string tempDir
Definition: TritonService.h:51
ProcessContext.h
edm::Log
Definition: MessageLogger.h:70
TritonService::FallbackOpts::retries
int retries
Definition: TritonService.h:48
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::ConfigurationDescriptions::addWithDefaultLabel
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:87
TritonService::verbose_
bool verbose_
Definition: TritonService.h:113
TritonService::servers_
std::unordered_map< std::string, Server > servers_
Definition: TritonService.h:121