CMS 3D CMS Logo

TritonService.cc
Go to the documentation of this file.
3 
11 
12 #include "grpc_client.h"
13 #include "grpc_service.pb.h"
14 
15 #include <cstdio>
16 #include <cstdlib>
17 #include <filesystem>
18 #include <fstream>
19 #include <utility>
20 #include <tuple>
21 #include <unistd.h>
22 
23 namespace tc = triton::client;
24 
27 
28 namespace {
29  std::pair<std::string, int> execSys(const std::string& cmd) {
30  //redirect stderr to stdout
31  auto pipe = popen((cmd + " 2>&1").c_str(), "r");
32  int thisErrno = errno;
33  if (!pipe)
34  throw cms::Exception("SystemError")
35  << "TritonService: popen() failed with errno " << thisErrno << " for command: " << cmd;
36 
37  //extract output
38  constexpr static unsigned buffSize = 128;
39  std::array<char, buffSize> buffer;
41  while (!feof(pipe)) {
42  if (fgets(buffer.data(), buffSize, pipe))
43  result += buffer.data();
44  else {
45  thisErrno = ferror(pipe);
46  if (thisErrno)
47  throw cms::Exception("SystemError")
48  << "TritonService: failed reading command output with errno " << thisErrno;
49  }
50  }
51 
52  int rv = pclose(pipe);
53  return std::make_pair(result, rv);
54  }
55 } // namespace
56 
58  : verbose_(pset.getUntrackedParameter<bool>("verbose")),
59  fallbackOpts_(pset.getParameterSet("fallback")),
60  currentModuleId_(0),
61  allowAddModel_(false),
62  startedFallback_(false),
63  pid_(std::to_string(::getpid())) {
64  //module construction is assumed to be serial (correct at the time this code was written)
68  //fallback server will be launched (if needed) before beginJob
71 
72  //include fallback server in set if enabled
73  if (fallbackOpts_.enable) {
74  auto serverType = TritonServerType::Remote;
75  if (!fallbackOpts_.useGPU)
76  serverType = TritonServerType::LocalCPU;
77 #ifdef TRITON_ENABLE_GPU
78  else
79  serverType = TritonServerType::LocalGPU;
80 #endif
81 
82  servers_.emplace(std::piecewise_construct,
83  std::forward_as_tuple(Server::fallbackName),
84  std::forward_as_tuple(Server::fallbackName, Server::fallbackAddress, serverType));
85  }
86 
87  //loop over input servers: check which models they have
89  if (verbose_)
90  msg = "List of models for each server:\n";
91  for (const auto& serverPset : pset.getUntrackedParameterSetVector("servers")) {
92  const std::string& serverName(serverPset.getUntrackedParameter<std::string>("name"));
93  //ensure uniqueness
94  auto [sit, unique] = servers_.emplace(serverName, serverPset);
95  if (!unique)
96  throw cms::Exception("DuplicateServer")
97  << "TritonService: Not allowed to specify more than one server with same name (" << serverName << ")";
98  auto& server(sit->second);
99 
100  std::unique_ptr<tc::InferenceServerGrpcClient> client;
102  tc::InferenceServerGrpcClient::Create(&client, server.url, false, server.useSsl, server.sslOptions),
103  "TritonService(): unable to create inference context for " + serverName + " (" + server.url + ")");
104 
105  if (verbose_) {
106  inference::ServerMetadataResponse serverMetaResponse;
107  TRITON_THROW_IF_ERROR(client->ServerMetadata(&serverMetaResponse),
108  "TritonService(): unable to get metadata for " + serverName + " (" + server.url + ")");
109  edm::LogInfo("TritonService") << "Server " << serverName << ": url = " << server.url
110  << ", version = " << serverMetaResponse.version();
111  }
112 
113  inference::RepositoryIndexResponse repoIndexResponse;
115  client->ModelRepositoryIndex(&repoIndexResponse),
116  "TritonService(): unable to get repository index for " + serverName + " (" + server.url + ")");
117 
118  //servers keep track of models and vice versa
119  if (verbose_)
120  msg += serverName + ": ";
121  for (const auto& modelIndex : repoIndexResponse.models()) {
122  const auto& modelName = modelIndex.name();
123  auto mit = models_.find(modelName);
124  if (mit == models_.end())
125  mit = models_.emplace(modelName, "").first;
126  auto& modelInfo(mit->second);
127  modelInfo.servers.insert(serverName);
128  server.models.insert(modelName);
129  if (verbose_)
130  msg += modelName + ", ";
131  }
132  if (verbose_)
133  msg += "\n";
134  }
135  if (verbose_)
136  edm::LogInfo("TritonService") << msg;
137 }
138 
140  currentModuleId_ = desc.id();
141  allowAddModel_ = true;
142 }
143 
145  //should only be called in module constructors
146  if (!allowAddModel_)
147  throw cms::Exception("DisallowedAddModel")
148  << "TritonService: Attempt to call addModel() outside of module constructors";
149  //if model is not in the list, then no specified server provides it
150  auto mit = models_.find(modelName);
151  if (mit == models_.end()) {
152  auto& modelInfo(unservedModels_.emplace(modelName, path).first->second);
153  modelInfo.modules.insert(currentModuleId_);
154  //only keep track of modules that need unserved models
156  }
157 }
158 
160 
162  //remove destructed modules from unserved list
163  if (unservedModels_.empty())
164  return;
165  auto id = desc.id();
166  auto oit = modules_.find(id);
167  if (oit != modules_.end()) {
168  const auto& moduleInfo(oit->second);
169  auto mit = unservedModels_.find(moduleInfo.model);
170  if (mit != unservedModels_.end()) {
171  auto& modelInfo(mit->second);
172  modelInfo.modules.erase(id);
173  //remove a model if it is no longer needed by any modules
174  if (modelInfo.modules.empty())
175  unservedModels_.erase(mit);
176  }
177  modules_.erase(oit);
178  }
179 }
180 
181 //second return value is only true if fallback CPU server is being used
183  auto mit = models_.find(model);
184  if (mit == models_.end())
185  throw cms::Exception("MissingModel") << "TritonService: There are no servers that provide model " << model;
186  const auto& modelInfo(mit->second);
187  const auto& modelServers = modelInfo.servers;
188 
189  auto msit = modelServers.end();
190  if (!preferred.empty()) {
191  msit = modelServers.find(preferred);
192  //todo: add a "strict" parameter to stop execution if preferred server isn't found?
193  if (msit == modelServers.end())
194  edm::LogWarning("PreferredServer") << "Preferred server " << preferred << " for model " << model
195  << " not available, will choose another server";
196  }
197  const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
198 
199  //todo: use some algorithm to select server rather than just picking arbitrarily
200  const auto& server(servers_.find(serverName)->second);
201  return server;
202 }
203 
205  //only need fallback if there are unserved models
206  if (!fallbackOpts_.enable or unservedModels_.empty())
207  return;
208 
210  if (verbose_)
211  msg = "List of models for fallback server: ";
212  //all unserved models are provided by fallback server
213  auto& server(servers_.find(Server::fallbackName)->second);
214  for (const auto& [modelName, model] : unservedModels_) {
215  auto& modelInfo(models_.emplace(modelName, model).first->second);
216  modelInfo.servers.insert(Server::fallbackName);
217  server.models.insert(modelName);
218  if (verbose_)
219  msg += modelName + ", ";
220  }
221  if (verbose_)
222  edm::LogInfo("TritonService") << msg;
223 
224  //assemble server start command
225  fallbackOpts_.command = "cmsTriton -P -1 -p " + pid_;
226  if (fallbackOpts_.debug)
227  fallbackOpts_.command += " -c";
229  fallbackOpts_.command += " -v";
231  fallbackOpts_.command += " -d";
232  if (fallbackOpts_.useGPU)
233  fallbackOpts_.command += " -g";
234  if (!fallbackOpts_.instanceName.empty())
236  if (fallbackOpts_.retries >= 0)
238  if (fallbackOpts_.wait >= 0)
240  for (const auto& [modelName, model] : unservedModels_) {
241  fallbackOpts_.command += " -m " + model.path;
242  }
243  if (!fallbackOpts_.imageName.empty())
245  if (!fallbackOpts_.sandboxName.empty())
247  //don't need this anymore
248  unservedModels_.clear();
249 
250  //get a random temporary directory if none specified
251  if (fallbackOpts_.tempDir.empty()) {
252  auto tmp_dir_path{std::filesystem::temp_directory_path() /= edm::createGlobalIdentifier()};
253  fallbackOpts_.tempDir = tmp_dir_path.string();
254  }
255  //special case ".": use script default (temp dir = .$instanceName)
256  if (fallbackOpts_.tempDir != ".")
258 
260 
261  if (fallbackOpts_.debug)
262  edm::LogInfo("TritonService") << "Fallback server temporary directory: " << fallbackOpts_.tempDir;
263  if (verbose_)
264  edm::LogInfo("TritonService") << command;
265 
266  //mark as started before executing in case of ctrl+c while command is running
267  startedFallback_ = true;
268  const auto& [output, rv] = execSys(command);
269  if (rv != 0) {
270  edm::LogError("TritonService") << output;
271  printFallbackServerLog<edm::LogError>();
272  throw cms::Exception("FallbackFailed")
273  << "TritonService: Starting the fallback server failed with exit code " << rv;
274  } else if (verbose_)
275  edm::LogInfo("TritonService") << output;
276  //get the port
277  const std::string& portIndicator("CMS_TRITON_GRPC_PORT: ");
278  //find last instance in log in case multiple ports were tried
279  auto pos = output.rfind(portIndicator);
280  if (pos != std::string::npos) {
281  auto pos2 = pos + portIndicator.size();
282  auto pos3 = output.find('\n', pos2);
283  const auto& portNum = output.substr(pos2, pos3 - pos2);
284  server.url += ":" + portNum;
285  } else
286  throw cms::Exception("FallbackFailed") << "TritonService: Unknown port for fallback server, log follows:\n"
287  << output;
288 }
289 
291  if (!startedFallback_)
292  return;
293 
295  if (verbose_)
296  edm::LogInfo("TritonService") << command;
297 
298  const auto& [output, rv] = execSys(command);
299  if (rv != 0) {
300  edm::LogError("TritonService") << output;
301  printFallbackServerLog<edm::LogError>();
302  throw cms::Exception("FallbackFailed")
303  << "TritonService: Stopping the fallback server failed with exit code " << rv;
304  } else if (verbose_) {
305  edm::LogInfo("TritonService") << output;
306  printFallbackServerLog<edm::LogInfo>();
307  }
308 }
309 
310 template <typename LOG>
312  std::vector<std::string> logNames{"log_" + fallbackOpts_.instanceName + ".log"};
313  //cmsTriton script moves log from temp to current dir in verbose mode or in some cases when auto_stop is called
314  // -> check both places
315  logNames.push_back(fallbackOpts_.tempDir + "/" + logNames[0]);
316  bool foundLog = false;
317  for (const auto& logName : logNames) {
318  std::ifstream infile(logName);
319  if (infile.is_open()) {
320  LOG("TritonService") << "TritonService: server log " << logName << "\n" << infile.rdbuf();
321  foundLog = true;
322  break;
323  }
324  }
325  if (!foundLog)
326  LOG("TritonService") << "TritonService: could not find server log " << logNames[0] << " in current directory or "
328 }
329 
332  desc.addUntracked<bool>("verbose", false);
333 
335  validator.addUntracked<std::string>("name");
336  validator.addUntracked<std::string>("address");
337  validator.addUntracked<unsigned>("port");
338  validator.addUntracked<bool>("useSsl", false);
339  validator.addUntracked<std::string>("rootCertificates", "");
340  validator.addUntracked<std::string>("privateKey", "");
341  validator.addUntracked<std::string>("certificateChain", "");
342 
343  desc.addVPSetUntracked("servers", validator, {});
344 
345  edm::ParameterSetDescription fallbackDesc;
346  fallbackDesc.addUntracked<bool>("enable", false);
347  fallbackDesc.addUntracked<bool>("debug", false);
348  fallbackDesc.addUntracked<bool>("verbose", false);
349  fallbackDesc.addUntracked<bool>("useDocker", false);
350  fallbackDesc.addUntracked<bool>("useGPU", false);
351  fallbackDesc.addUntracked<int>("retries", -1);
352  fallbackDesc.addUntracked<int>("wait", -1);
353  fallbackDesc.addUntracked<std::string>("instanceBaseName", "triton_server_instance");
354  fallbackDesc.addUntracked<std::string>("instanceName", "");
355  fallbackDesc.addUntracked<std::string>("tempDir", "");
356  fallbackDesc.addUntracked<std::string>("imageName", "");
357  fallbackDesc.addUntracked<std::string>("sandboxName", "");
358  desc.add<edm::ParameterSetDescription>("fallback", fallbackDesc);
359 
360  descriptions.addWithDefaultLabel(desc);
361 }
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
std::unordered_map< std::string, Model > models_
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void watchPostEndJob(PostEndJob::slot_type const &iSlot)
std::unordered_map< std::string, Model > unservedModels_
#define TRITON_THROW_IF_ERROR(X, MSG)
Definition: triton_utils.h:75
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
void postModuleConstruction(edm::ModuleDescription const &)
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
std::string to_string(const V &value)
Definition: OMSAccess.h:77
unsigned currentModuleId_
#define LOG(A)
Log< level::Error, false > LogError
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
static const std::string fallbackAddress
Definition: TritonService.h:87
void addModel(const std::string &modelName, const std::string &path)
FallbackOpts fallbackOpts_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
def unique(seq, keepstr=True)
Definition: tier0.py:24
std::string pid_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void printFallbackServerLog() const
def pipe(cmdline, input=None)
Definition: pipe.py:5
void preModuleConstruction(edm::ModuleDescription const &)
std::string createGlobalIdentifier(bool binary=false)
Log< level::Info, false > LogInfo
void preModuleDestruction(edm::ModuleDescription const &)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::unordered_map< unsigned, Module > modules_
tuple msg
Definition: mps_check.py:286
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
ParameterSet const & getParameterSet(ParameterSetID const &id)
list command
Definition: mps_check.py:25
Server serverInfo(const std::string &model, const std::string &preferred="") const
list cmd
Definition: mps_setup.py:244
Definition: output.py:1
Definition: pipe.py:1
std::unordered_map< std::string, Server > servers_
static const std::string fallbackName
Definition: TritonService.h:86