CMS 3D CMS Logo

TritonService.cc
Go to the documentation of this file.
3 
12 
13 #include "grpc_client.h"
14 #include "grpc_service.pb.h"
15 
16 #include <cstdio>
17 #include <cstdlib>
18 #include <filesystem>
19 #include <fstream>
20 #include <utility>
21 #include <tuple>
22 #include <unistd.h>
23 
24 namespace tc = triton::client;
25 
28 
29 namespace {
30  std::pair<std::string, int> execSys(const std::string& cmd) {
31  //redirect stderr to stdout
32  auto pipe = popen((cmd + " 2>&1").c_str(), "r");
33  int thisErrno = errno;
34  if (!pipe)
35  throw cms::Exception("SystemError")
36  << "TritonService: popen() failed with errno " << thisErrno << " for command: " << cmd;
37 
38  //extract output
39  constexpr static unsigned buffSize = 128;
40  std::array<char, buffSize> buffer;
42  while (!feof(pipe)) {
43  if (fgets(buffer.data(), buffSize, pipe))
44  result += buffer.data();
45  else {
46  thisErrno = ferror(pipe);
47  if (thisErrno)
48  throw cms::Exception("SystemError")
49  << "TritonService: failed reading command output with errno " << thisErrno;
50  }
51  }
52 
53  int rv = pclose(pipe);
54  return std::make_pair(result, rv);
55  }
56 } // namespace
57 
59  : verbose_(pset.getUntrackedParameter<bool>("verbose")),
60  fallbackOpts_(pset.getParameterSet("fallback")),
61  currentModuleId_(0),
62  allowAddModel_(false),
63  startedFallback_(false),
64  pid_(std::to_string(::getpid())) {
65  //module construction is assumed to be serial (correct at the time this code was written)
66 
68 
72  //fallback server will be launched (if needed) before beginJob
75 
76  //include fallback server in set if enabled
77  if (fallbackOpts_.enable) {
78  auto serverType = TritonServerType::Remote;
79  if (!fallbackOpts_.useGPU)
80  serverType = TritonServerType::LocalCPU;
81 #ifdef TRITON_ENABLE_GPU
82  else
83  serverType = TritonServerType::LocalGPU;
84 #endif
85 
86  servers_.emplace(std::piecewise_construct,
87  std::forward_as_tuple(Server::fallbackName),
88  std::forward_as_tuple(Server::fallbackName, Server::fallbackAddress, serverType));
89  }
90 
91  //loop over input servers: check which models they have
93  if (verbose_)
94  msg = "List of models for each server:\n";
95  for (const auto& serverPset : pset.getUntrackedParameterSetVector("servers")) {
96  const std::string& serverName(serverPset.getUntrackedParameter<std::string>("name"));
97  //ensure uniqueness
98  auto [sit, unique] = servers_.emplace(serverName, serverPset);
99  if (!unique)
100  throw cms::Exception("DuplicateServer")
101  << "TritonService: Not allowed to specify more than one server with same name (" << serverName << ")";
102  auto& server(sit->second);
103 
104  std::unique_ptr<tc::InferenceServerGrpcClient> client;
106  tc::InferenceServerGrpcClient::Create(&client, server.url, false, server.useSsl, server.sslOptions),
107  "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")");
108 
109  if (verbose_) {
110  inference::ServerMetadataResponse serverMetaResponse;
111  TRITON_THROW_IF_ERROR(client->ServerMetadata(&serverMetaResponse),
112  "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")");
113  edm::LogInfo("TritonService") << "Server " << serverName << ": url = " << server.url
114  << ", version = " << serverMetaResponse.version();
115  }
116 
117  inference::RepositoryIndexResponse repoIndexResponse;
119  client->ModelRepositoryIndex(&repoIndexResponse),
120  "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")");
121 
122  //servers keep track of models and vice versa
123  if (verbose_)
124  msg += serverName + ": ";
125  for (const auto& modelIndex : repoIndexResponse.models()) {
126  const auto& modelName = modelIndex.name();
127  auto mit = models_.find(modelName);
128  if (mit == models_.end())
129  mit = models_.emplace(modelName, "").first;
130  auto& modelInfo(mit->second);
131  modelInfo.servers.insert(serverName);
132  server.models.insert(modelName);
133  if (verbose_)
134  msg += modelName + ", ";
135  }
136  if (verbose_)
137  msg += "\n";
138  }
139  if (verbose_)
140  edm::LogInfo("TritonService") << msg;
141 }
142 
144  numberOfThreads_ = bounds.maxNumberOfThreads();
145 }
146 
148  currentModuleId_ = desc.id();
149  allowAddModel_ = true;
150 }
151 
153  //should only be called in module constructors
154  if (!allowAddModel_)
155  throw cms::Exception("DisallowedAddModel")
156  << "TritonService: Attempt to call addModel() outside of module constructors";
157  //if model is not in the list, then no specified server provides it
158  auto mit = models_.find(modelName);
159  if (mit == models_.end()) {
160  auto& modelInfo(unservedModels_.emplace(modelName, path).first->second);
161  modelInfo.modules.insert(currentModuleId_);
162  //only keep track of modules that need unserved models
164  }
165 }
166 
168 
170  //remove destructed modules from unserved list
171  if (unservedModels_.empty())
172  return;
173  auto id = desc.id();
174  auto oit = modules_.find(id);
175  if (oit != modules_.end()) {
176  const auto& moduleInfo(oit->second);
177  auto mit = unservedModels_.find(moduleInfo.model);
178  if (mit != unservedModels_.end()) {
179  auto& modelInfo(mit->second);
180  modelInfo.modules.erase(id);
181  //remove a model if it is no longer needed by any modules
182  if (modelInfo.modules.empty())
183  unservedModels_.erase(mit);
184  }
185  modules_.erase(oit);
186  }
187 }
188 
189 //second return value is only true if fallback CPU server is being used
191  auto mit = models_.find(model);
192  if (mit == models_.end())
193  throw cms::Exception("MissingModel") << "TritonService: There are no servers that provide model " << model;
194  const auto& modelInfo(mit->second);
195  const auto& modelServers = modelInfo.servers;
196 
197  auto msit = modelServers.end();
198  if (!preferred.empty()) {
199  msit = modelServers.find(preferred);
200  //todo: add a "strict" parameter to stop execution if preferred server isn't found?
201  if (msit == modelServers.end())
202  edm::LogWarning("PreferredServer") << "Preferred server " << preferred << " for model " << model
203  << " not available, will choose another server";
204  }
205  const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
206 
207  //todo: use some algorithm to select server rather than just picking arbitrarily
208  const auto& server(servers_.find(serverName)->second);
209  return server;
210 }
211 
213  //only need fallback if there are unserved models
214  if (!fallbackOpts_.enable or unservedModels_.empty())
215  return;
216 
218  if (verbose_)
219  msg = "List of models for fallback server: ";
220  //all unserved models are provided by fallback server
221  auto& server(servers_.find(Server::fallbackName)->second);
222  for (const auto& [modelName, model] : unservedModels_) {
223  auto& modelInfo(models_.emplace(modelName, model).first->second);
224  modelInfo.servers.insert(Server::fallbackName);
225  server.models.insert(modelName);
226  if (verbose_)
227  msg += modelName + ", ";
228  }
229  if (verbose_)
230  edm::LogInfo("TritonService") << msg;
231 
232  //assemble server start command
233  fallbackOpts_.command = "cmsTriton -P -1 -p " + pid_;
234  if (fallbackOpts_.debug)
235  fallbackOpts_.command += " -c";
237  fallbackOpts_.command += " -v";
239  fallbackOpts_.command += " -d";
240  if (fallbackOpts_.useGPU)
241  fallbackOpts_.command += " -g";
242  if (!fallbackOpts_.instanceName.empty())
244  if (fallbackOpts_.retries >= 0)
246  if (fallbackOpts_.wait >= 0)
248  for (const auto& [modelName, model] : unservedModels_) {
249  fallbackOpts_.command += " -m " + model.path;
250  }
251  std::string thread_string = " -I " + std::to_string(numberOfThreads_);
252  fallbackOpts_.command += thread_string;
253  if (!fallbackOpts_.imageName.empty())
255  if (!fallbackOpts_.sandboxName.empty())
257  //don't need this anymore
258  unservedModels_.clear();
259 
260  //get a random temporary directory if none specified
261  if (fallbackOpts_.tempDir.empty()) {
262  auto tmp_dir_path{std::filesystem::temp_directory_path() /= edm::createGlobalIdentifier()};
263  fallbackOpts_.tempDir = tmp_dir_path.string();
264  }
265  //special case ".": use script default (temp dir = .$instanceName)
266  if (fallbackOpts_.tempDir != ".")
268 
270 
271  if (fallbackOpts_.debug)
272  edm::LogInfo("TritonService") << "Fallback server temporary directory: " << fallbackOpts_.tempDir;
273  if (verbose_)
274  edm::LogInfo("TritonService") << command;
275 
276  //mark as started before executing in case of ctrl+c while command is running
277  startedFallback_ = true;
278  const auto& [output, rv] = execSys(command);
279  if (rv != 0) {
280  edm::LogError("TritonService") << output;
281  printFallbackServerLog<edm::LogError>();
282  throw cms::Exception("FallbackFailed")
283  << "TritonService: Starting the fallback server failed with exit code " << rv;
284  } else if (verbose_)
285  edm::LogInfo("TritonService") << output;
286  //get the port
287  const std::string& portIndicator("CMS_TRITON_GRPC_PORT: ");
288  //find last instance in log in case multiple ports were tried
289  auto pos = output.rfind(portIndicator);
290  if (pos != std::string::npos) {
291  auto pos2 = pos + portIndicator.size();
292  auto pos3 = output.find('\n', pos2);
293  const auto& portNum = output.substr(pos2, pos3 - pos2);
294  server.url += ":" + portNum;
295  } else
296  throw cms::Exception("FallbackFailed") << "TritonService: Unknown port for fallback server, log follows:\n"
297  << output;
298 }
299 
301  if (!startedFallback_)
302  return;
303 
305  if (verbose_)
306  edm::LogInfo("TritonService") << command;
307 
308  const auto& [output, rv] = execSys(command);
309  if (rv != 0) {
310  edm::LogError("TritonService") << output;
311  printFallbackServerLog<edm::LogError>();
312  throw cms::Exception("FallbackFailed")
313  << "TritonService: Stopping the fallback server failed with exit code " << rv;
314  } else if (verbose_) {
315  edm::LogInfo("TritonService") << output;
316  printFallbackServerLog<edm::LogInfo>();
317  }
318 }
319 
320 template <typename LOG>
322  std::vector<std::string> logNames{"log_" + fallbackOpts_.instanceName + ".log"};
323  //cmsTriton script moves log from temp to current dir in verbose mode or in some cases when auto_stop is called
324  // -> check both places
325  logNames.push_back(fallbackOpts_.tempDir + "/" + logNames[0]);
326  bool foundLog = false;
327  for (const auto& logName : logNames) {
328  std::ifstream infile(logName);
329  if (infile.is_open()) {
330  LOG("TritonService") << "TritonService: server log " << logName << "\n" << infile.rdbuf();
331  foundLog = true;
332  break;
333  }
334  }
335  if (!foundLog)
336  LOG("TritonService") << "TritonService: could not find server log " << logNames[0] << " in current directory or "
338 }
339 
342  desc.addUntracked<bool>("verbose", false);
343 
345  validator.addUntracked<std::string>("name");
346  validator.addUntracked<std::string>("address");
347  validator.addUntracked<unsigned>("port");
348  validator.addUntracked<bool>("useSsl", false);
349  validator.addUntracked<std::string>("rootCertificates", "");
350  validator.addUntracked<std::string>("privateKey", "");
351  validator.addUntracked<std::string>("certificateChain", "");
352 
353  desc.addVPSetUntracked("servers", validator, {});
354 
355  edm::ParameterSetDescription fallbackDesc;
356  fallbackDesc.addUntracked<bool>("enable", false);
357  fallbackDesc.addUntracked<bool>("debug", false);
358  fallbackDesc.addUntracked<bool>("verbose", false);
359  fallbackDesc.addUntracked<bool>("useDocker", false);
360  fallbackDesc.addUntracked<bool>("useGPU", false);
361  fallbackDesc.addUntracked<int>("retries", -1);
362  fallbackDesc.addUntracked<int>("wait", -1);
363  fallbackDesc.addUntracked<std::string>("instanceBaseName", "triton_server_instance");
364  fallbackDesc.addUntracked<std::string>("instanceName", "");
365  fallbackDesc.addUntracked<std::string>("tempDir", "");
366  fallbackDesc.addUntracked<std::string>("imageName", "");
367  fallbackDesc.addUntracked<std::string>("sandboxName", "");
368  desc.add<edm::ParameterSetDescription>("fallback", fallbackDesc);
369 
370  descriptions.addWithDefaultLabel(desc);
371 }
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
std::unordered_map< std::string, Model > models_
void watchPreallocate(Preallocate::slot_type const &iSlot)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
std::unordered_map< std::string, Model > unservedModels_
#define TRITON_THROW_IF_ERROR(X, MSG)
Definition: triton_utils.h:75
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
void postModuleConstruction(edm::ModuleDescription const &)
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
unsigned currentModuleId_
void preallocate(edm::service::SystemBounds const &)
#define LOG(A)
Log< level::Error, false > LogError
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
static const std::string fallbackAddress
Definition: TritonService.h:90
static std::string to_string(const XMLCh *ch)
void addModel(const std::string &modelName, const std::string &path)
FallbackOpts fallbackOpts_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
std::string pid_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void printFallbackServerLog() const
def pipe(cmdline, input=None)
Definition: pipe.py:5
void preModuleConstruction(edm::ModuleDescription const &)
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
void preModuleDestruction(edm::ModuleDescription const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::unordered_map< unsigned, Module > modules_
tuple msg
Definition: mps_check.py:286
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
ParameterSet const & getParameterSet(ParameterSetID const &id)
list command
Definition: mps_check.py:25
Server serverInfo(const std::string &model, const std::string &preferred="") const
list cmd
Definition: mps_setup.py:244
Definition: output.py:1
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
Definition: pipe.py:1
std::unordered_map< std::string, Server > servers_
static const std::string fallbackName
Definition: TritonService.h:89