CMS 3D CMS Logo

TritonService.cc
Go to the documentation of this file.
3 
12 
13 #include "grpc_client.h"
14 #include "grpc_service.pb.h"
15 
16 #include <cstdio>
17 #include <cstdlib>
18 #include <filesystem>
19 #include <fstream>
20 #include <utility>
21 #include <tuple>
22 #include <unistd.h>
23 
24 namespace tc = triton::client;
25 
28 
29 namespace {
30  std::pair<std::string, int> execSys(const std::string& cmd) {
31  //redirect stderr to stdout
32  auto pipe = popen((cmd + " 2>&1").c_str(), "r");
33  int thisErrno = errno;
34  if (!pipe)
35  throw cms::Exception("SystemError")
36  << "TritonService: popen() failed with errno " << thisErrno << " for command: " << cmd;
37 
38  //extract output
39  constexpr static unsigned buffSize = 128;
40  std::array<char, buffSize> buffer;
42  while (!feof(pipe)) {
43  if (fgets(buffer.data(), buffSize, pipe))
44  result += buffer.data();
45  else {
46  thisErrno = ferror(pipe);
47  if (thisErrno)
48  throw cms::Exception("SystemError")
49  << "TritonService: failed reading command output with errno " << thisErrno;
50  }
51  }
52 
53  int rv = pclose(pipe);
54  return std::make_pair(result, rv);
55  }
56 } // namespace
57 
59  : verbose_(pset.getUntrackedParameter<bool>("verbose")),
60  fallbackOpts_(pset.getParameterSet("fallback")),
61  currentModuleId_(0),
62  allowAddModel_(false),
63  startedFallback_(false),
64  callFails_(0),
65  pid_(std::to_string(::getpid())) {
66  //module construction is assumed to be serial (correct at the time this code was written)
67 
69 
73  //fallback server will be launched (if needed) before beginJob
76 
77  //include fallback server in set if enabled
78  if (fallbackOpts_.enable) {
79  auto serverType = TritonServerType::Remote;
80  if (!fallbackOpts_.useGPU)
81  serverType = TritonServerType::LocalCPU;
82 #ifdef TRITON_ENABLE_GPU
83  else
84  serverType = TritonServerType::LocalGPU;
85 #endif
86 
87  servers_.emplace(std::piecewise_construct,
88  std::forward_as_tuple(Server::fallbackName),
89  std::forward_as_tuple(Server::fallbackName, Server::fallbackAddress, serverType));
90  }
91 
92  //loop over input servers: check which models they have
94  if (verbose_)
95  msg = "List of models for each server:\n";
96  for (const auto& serverPset : pset.getUntrackedParameterSetVector("servers")) {
97  const std::string& serverName(serverPset.getUntrackedParameter<std::string>("name"));
98  //ensure uniqueness
99  auto [sit, unique] = servers_.emplace(serverName, serverPset);
100  if (!unique)
101  throw cms::Exception("DuplicateServer")
102  << "TritonService: Not allowed to specify more than one server with same name (" << serverName << ")";
103  auto& server(sit->second);
104 
105  std::unique_ptr<tc::InferenceServerGrpcClient> client;
107  tc::InferenceServerGrpcClient::Create(&client, server.url, false, server.useSsl, server.sslOptions),
108  "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")",
109  false);
110 
111  if (verbose_) {
112  inference::ServerMetadataResponse serverMetaResponse;
113  TRITON_THROW_IF_ERROR(client->ServerMetadata(&serverMetaResponse),
114  "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")",
115  false);
116  edm::LogInfo("TritonService") << "Server " << serverName << ": url = " << server.url
117  << ", version = " << serverMetaResponse.version();
118  }
119 
120  inference::RepositoryIndexResponse repoIndexResponse;
121  TRITON_THROW_IF_ERROR(client->ModelRepositoryIndex(&repoIndexResponse),
122  "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")",
123  false);
124 
125  //servers keep track of models and vice versa
126  if (verbose_)
127  msg += serverName + ": ";
128  for (const auto& modelIndex : repoIndexResponse.models()) {
129  const auto& modelName = modelIndex.name();
130  auto mit = models_.find(modelName);
131  if (mit == models_.end())
132  mit = models_.emplace(modelName, "").first;
133  auto& modelInfo(mit->second);
134  modelInfo.servers.insert(serverName);
135  server.models.insert(modelName);
136  if (verbose_)
137  msg += modelName + ", ";
138  }
139  if (verbose_)
140  msg += "\n";
141  }
142  if (verbose_)
143  edm::LogInfo("TritonService") << msg;
144 }
145 
147  numberOfThreads_ = bounds.maxNumberOfThreads();
148 }
149 
151  currentModuleId_ = desc.id();
152  allowAddModel_ = true;
153 }
154 
156  //should only be called in module constructors
157  if (!allowAddModel_)
158  throw cms::Exception("DisallowedAddModel")
159  << "TritonService: Attempt to call addModel() outside of module constructors";
160  //if model is not in the list, then no specified server provides it
161  auto mit = models_.find(modelName);
162  if (mit == models_.end()) {
163  auto& modelInfo(unservedModels_.emplace(modelName, path).first->second);
164  modelInfo.modules.insert(currentModuleId_);
165  //only keep track of modules that need unserved models
167  }
168 }
169 
171 
173  //remove destructed modules from unserved list
174  if (unservedModels_.empty())
175  return;
176  auto id = desc.id();
177  auto oit = modules_.find(id);
178  if (oit != modules_.end()) {
179  const auto& moduleInfo(oit->second);
180  auto mit = unservedModels_.find(moduleInfo.model);
181  if (mit != unservedModels_.end()) {
182  auto& modelInfo(mit->second);
183  modelInfo.modules.erase(id);
184  //remove a model if it is no longer needed by any modules
185  if (modelInfo.modules.empty())
186  unservedModels_.erase(mit);
187  }
188  modules_.erase(oit);
189  }
190 }
191 
192 //second return value is only true if fallback CPU server is being used
194  auto mit = models_.find(model);
195  if (mit == models_.end())
196  throw cms::Exception("MissingModel") << "TritonService: There are no servers that provide model " << model;
197  const auto& modelInfo(mit->second);
198  const auto& modelServers = modelInfo.servers;
199 
200  auto msit = modelServers.end();
201  if (!preferred.empty()) {
202  msit = modelServers.find(preferred);
203  //todo: add a "strict" parameter to stop execution if preferred server isn't found?
204  if (msit == modelServers.end())
205  edm::LogWarning("PreferredServer") << "Preferred server " << preferred << " for model " << model
206  << " not available, will choose another server";
207  }
208  const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
209 
210  //todo: use some algorithm to select server rather than just picking arbitrarily
211  const auto& server(servers_.find(serverName)->second);
212  return server;
213 }
214 
216  //only need fallback if there are unserved models
217  if (!fallbackOpts_.enable or unservedModels_.empty())
218  return;
219 
221  if (verbose_)
222  msg = "List of models for fallback server: ";
223  //all unserved models are provided by fallback server
224  auto& server(servers_.find(Server::fallbackName)->second);
225  for (const auto& [modelName, model] : unservedModels_) {
226  auto& modelInfo(models_.emplace(modelName, model).first->second);
227  modelInfo.servers.insert(Server::fallbackName);
228  server.models.insert(modelName);
229  if (verbose_)
230  msg += modelName + ", ";
231  }
232  if (verbose_)
233  edm::LogInfo("TritonService") << msg;
234 
235  //assemble server start command
236  fallbackOpts_.command = "cmsTriton -P -1 -p " + pid_;
237  if (fallbackOpts_.debug)
238  fallbackOpts_.command += " -c";
240  fallbackOpts_.command += " -v";
242  fallbackOpts_.command += " -d";
243  if (fallbackOpts_.useGPU)
244  fallbackOpts_.command += " -g";
245  if (!fallbackOpts_.instanceName.empty())
247  if (fallbackOpts_.retries >= 0)
249  if (fallbackOpts_.wait >= 0)
251  for (const auto& [modelName, model] : unservedModels_) {
252  fallbackOpts_.command += " -m " + model.path;
253  }
254  std::string thread_string = " -I " + std::to_string(numberOfThreads_);
255  fallbackOpts_.command += thread_string;
256  if (!fallbackOpts_.imageName.empty())
258  if (!fallbackOpts_.sandboxName.empty())
260  //don't need this anymore
261  unservedModels_.clear();
262 
263  //get a random temporary directory if none specified
264  if (fallbackOpts_.tempDir.empty()) {
265  auto tmp_dir_path{std::filesystem::temp_directory_path() /= edm::createGlobalIdentifier()};
266  fallbackOpts_.tempDir = tmp_dir_path.string();
267  }
268  //special case ".": use script default (temp dir = .$instanceName)
269  if (fallbackOpts_.tempDir != ".")
271 
273 
274  if (fallbackOpts_.debug)
275  edm::LogInfo("TritonService") << "Fallback server temporary directory: " << fallbackOpts_.tempDir;
276  if (verbose_)
277  edm::LogInfo("TritonService") << command;
278 
279  //mark as started before executing in case of ctrl+c while command is running
280  startedFallback_ = true;
281  const auto& [output, rv] = execSys(command);
282  if (rv != 0) {
283  edm::LogError("TritonService") << output;
284  printFallbackServerLog<edm::LogError>();
285  throw cms::Exception("FallbackFailed")
286  << "TritonService: Starting the fallback server failed with exit code " << rv;
287  } else if (verbose_)
288  edm::LogInfo("TritonService") << output;
289  //get the port
290  const std::string& portIndicator("CMS_TRITON_GRPC_PORT: ");
291  //find last instance in log in case multiple ports were tried
292  auto pos = output.rfind(portIndicator);
293  if (pos != std::string::npos) {
294  auto pos2 = pos + portIndicator.size();
295  auto pos3 = output.find('\n', pos2);
296  const auto& portNum = output.substr(pos2, pos3 - pos2);
297  server.url += ":" + portNum;
298  } else
299  throw cms::Exception("FallbackFailed") << "TritonService: Unknown port for fallback server, log follows:\n"
300  << output;
301 }
302 
304  if (status)
305  --callFails_;
306  else
307  ++callFails_;
308 }
309 
311  if (!startedFallback_)
312  return;
313 
315  //prevent log cleanup during server stop
316  if (callFails_ > 0)
317  command += " -c";
318  command += " stop";
319  if (verbose_)
320  edm::LogInfo("TritonService") << command;
321 
322  const auto& [output, rv] = execSys(command);
323  if (rv != 0 or callFails_ > 0) {
324  //print logs if cmsRun is currently exiting because of a TritonException
325  edm::LogError("TritonService") << output;
326  printFallbackServerLog<edm::LogError>();
327  if (rv != 0) {
328  std::string stopCat("FallbackFailed");
329  std::string stopMsg = fmt::format("TritonService: Stopping the fallback server failed with exit code {}", rv);
330  //avoid throwing if the stack is already unwinding
331  if (callFails_ > 0)
332  edm::LogWarning(stopCat) << stopMsg;
333  else
334  throw cms::Exception(stopCat) << stopMsg;
335  }
336  } else if (verbose_) {
337  edm::LogInfo("TritonService") << output;
338  printFallbackServerLog<edm::LogInfo>();
339  }
340 }
341 
342 template <typename LOG>
344  std::vector<std::string> logNames{"log_" + fallbackOpts_.instanceName + ".log"};
345  //cmsTriton script moves log from temp to current dir in verbose mode or in some cases when auto_stop is called
346  // -> check both places
347  logNames.push_back(fallbackOpts_.tempDir + "/" + logNames[0]);
348  bool foundLog = false;
349  for (const auto& logName : logNames) {
350  std::ifstream infile(logName);
351  if (infile.is_open()) {
352  LOG("TritonService") << "TritonService: server log " << logName << "\n" << infile.rdbuf();
353  foundLog = true;
354  break;
355  }
356  }
357  if (!foundLog)
358  LOG("TritonService") << "TritonService: could not find server log " << logNames[0] << " in current directory or "
360 }
361 
364  desc.addUntracked<bool>("verbose", false);
365 
367  validator.addUntracked<std::string>("name");
368  validator.addUntracked<std::string>("address");
369  validator.addUntracked<unsigned>("port");
370  validator.addUntracked<bool>("useSsl", false);
371  validator.addUntracked<std::string>("rootCertificates", "");
372  validator.addUntracked<std::string>("privateKey", "");
373  validator.addUntracked<std::string>("certificateChain", "");
374 
375  desc.addVPSetUntracked("servers", validator, {});
376 
377  edm::ParameterSetDescription fallbackDesc;
378  fallbackDesc.addUntracked<bool>("enable", false);
379  fallbackDesc.addUntracked<bool>("debug", false);
380  fallbackDesc.addUntracked<bool>("verbose", false);
381  fallbackDesc.addUntracked<bool>("useDocker", false);
382  fallbackDesc.addUntracked<bool>("useGPU", false);
383  fallbackDesc.addUntracked<int>("retries", -1);
384  fallbackDesc.addUntracked<int>("wait", -1);
385  fallbackDesc.addUntracked<std::string>("instanceBaseName", "triton_server_instance");
386  fallbackDesc.addUntracked<std::string>("instanceName", "");
387  fallbackDesc.addUntracked<std::string>("tempDir", "");
388  fallbackDesc.addUntracked<std::string>("imageName", "");
389  fallbackDesc.addUntracked<std::string>("sandboxName", "");
390  desc.add<edm::ParameterSetDescription>("fallback", fallbackDesc);
391 
392  descriptions.addWithDefaultLabel(desc);
393 }
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
std::unordered_map< std::string, Model > models_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
void notifyCallStatus(bool status) const
std::unordered_map< std::string, Model > unservedModels_
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
void postModuleConstruction(edm::ModuleDescription const &)
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
unsigned currentModuleId_
void preallocate(edm::service::SystemBounds const &)
#define LOG(A)
Log< level::Error, false > LogError
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
static const std::string fallbackAddress
Definition: TritonService.h:91
static std::string to_string(const XMLCh *ch)
void addModel(const std::string &modelName, const std::string &path)
FallbackOpts fallbackOpts_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
std::string pid_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void printFallbackServerLog() const
def pipe(cmdline, input=None)
Definition: pipe.py:5
#define TRITON_THROW_IF_ERROR(X, MSG, NOTIFY)
Definition: triton_utils.h:78
void preModuleConstruction(edm::ModuleDescription const &)
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
void preModuleDestruction(edm::ModuleDescription const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::unordered_map< unsigned, Module > modules_
std::atomic< int > callFails_
tuple msg
Definition: mps_check.py:286
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
ParameterSet const & getParameterSet(ParameterSetID const &id)
list command
Definition: mps_check.py:25
Server serverInfo(const std::string &model, const std::string &preferred="") const
list cmd
Definition: mps_setup.py:244
Definition: output.py:1
Log< level::Warning, false > LogWarning
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
Definition: pipe.py:1
std::unordered_map< std::string, Server > servers_
static const std::string fallbackName
Definition: TritonService.h:90