CMS 3D CMS Logo

TritonService.cc
Go to the documentation of this file.
3 
12 
13 #include "grpc_client.h"
14 #include "grpc_service.pb.h"
15 
16 #include <cstdio>
17 #include <cstdlib>
18 #include <filesystem>
19 #include <utility>
20 #include <tuple>
21 #include <unistd.h>
22 
23 namespace ni = nvidia::inferenceserver;
24 namespace nic = ni::client;
25 
28 
29 namespace {
30  std::pair<std::string, int> execSys(const std::string& cmd) {
31  //redirect stderr to stdout
32  auto pipe = popen((cmd + " 2>&1").c_str(), "r");
33  int thisErrno = errno;
34  if (!pipe)
35  throw cms::Exception("SystemError") << "popen() failed with errno " << thisErrno << " for command: " << cmd;
36 
37  //extract output
38  constexpr static unsigned buffSize = 128;
39  std::array<char, buffSize> buffer;
41  while (!feof(pipe)) {
42  if (fgets(buffer.data(), buffSize, pipe))
43  result += buffer.data();
44  else {
45  thisErrno = ferror(pipe);
46  if (thisErrno)
47  throw cms::Exception("SystemError") << "failed reading command output with errno " << thisErrno;
48  }
49  }
50 
51  int rv = pclose(pipe);
52  return std::make_pair(result, rv);
53  }
54 } // namespace
55 
57  : verbose_(pset.getUntrackedParameter<bool>("verbose")),
58  fallbackOpts_(pset.getParameterSet("fallback")),
59  currentModuleId_(0),
60  allowAddModel_(false),
61  startedFallback_(false) {
62  //module construction is assumed to be serial (correct at the time this code was written)
66  //fallback server will be launched (if needed) before beginJob
68 
69  //include fallback server in set if enabled
71  servers_.emplace(std::piecewise_construct,
72  std::forward_as_tuple(Server::fallbackName),
73  std::forward_as_tuple(Server::fallbackName, Server::fallbackAddress));
74 
75  //loop over input servers: check which models they have
77  if (verbose_)
78  msg = "List of models for each server:\n";
79  for (const auto& serverPset : pset.getUntrackedParameterSetVector("servers")) {
80  const std::string& serverName(serverPset.getUntrackedParameter<std::string>("name"));
81  //ensure uniqueness
82  auto [sit, unique] = servers_.emplace(serverName, serverPset);
83  if (!unique)
84  throw cms::Exception("DuplicateServer")
85  << "Not allowed to specify more than one server with same name (" << serverName << ")";
86  auto& serverInfo(sit->second);
87 
88  std::unique_ptr<nic::InferenceServerGrpcClient> client;
90  nic::InferenceServerGrpcClient::Create(&client, serverInfo.url, false),
91  "TritonService(): unable to create inference context for " + serverName + " (" + serverInfo.url + ")");
92 
93  inference::RepositoryIndexResponse repoIndexResponse;
95  client->ModelRepositoryIndex(&repoIndexResponse),
96  "TritonService(): unable to get repository index for " + serverName + " (" + serverInfo.url + ")");
97 
98  //servers keep track of models and vice versa
99  if (verbose_)
100  msg += serverName + ": ";
101  for (const auto& modelIndex : repoIndexResponse.models()) {
102  const auto& modelName = modelIndex.name();
103  auto mit = models_.find(modelName);
104  if (mit == models_.end())
105  mit = models_.emplace(modelName, "").first;
106  auto& modelInfo(mit->second);
107  modelInfo.servers.insert(serverName);
108  serverInfo.models.insert(modelName);
109  if (verbose_)
110  msg += modelName + ", ";
111  }
112  if (verbose_)
113  msg += "\n";
114  }
115  if (verbose_)
116  edm::LogInfo("TritonService") << msg;
117 }
118 
120  currentModuleId_ = desc.id();
121  allowAddModel_ = true;
122 }
123 
125  //should only be called in module constructors
126  if (!allowAddModel_)
127  throw cms::Exception("DisallowedAddModel") << "Attempt to call addModel() outside of module constructors";
128  //if model is not in the list, then no specified server provides it
129  auto mit = models_.find(modelName);
130  if (mit == models_.end()) {
131  auto& modelInfo(unservedModels_.emplace(modelName, path).first->second);
132  modelInfo.modules.insert(currentModuleId_);
133  //only keep track of modules that need unserved models
135  }
136 }
137 
139 
141  //remove destructed modules from unserved list
142  if (unservedModels_.empty())
143  return;
144  auto id = desc.id();
145  auto oit = modules_.find(id);
146  if (oit != modules_.end()) {
147  const auto& moduleInfo(oit->second);
148  auto mit = unservedModels_.find(moduleInfo.model);
149  if (mit != unservedModels_.end()) {
150  auto& modelInfo(mit->second);
151  modelInfo.modules.erase(id);
152  //remove a model if it is no longer needed by any modules
153  if (modelInfo.modules.empty())
154  unservedModels_.erase(mit);
155  }
156  modules_.erase(oit);
157  }
158 }
159 
160 //second return value is only true if fallback CPU server is being used
161 std::pair<std::string, bool> TritonService::serverAddress(const std::string& model,
162  const std::string& preferred) const {
163  auto mit = models_.find(model);
164  if (mit == models_.end())
165  throw cms::Exception("MissingModel") << "There are no servers that provide model " << model;
166  const auto& modelInfo(mit->second);
167  const auto& modelServers = modelInfo.servers;
168 
169  auto msit = modelServers.end();
170  if (!preferred.empty()) {
171  msit = modelServers.find(preferred);
172  //todo: add a "strict" parameter to stop execution if preferred server isn't found?
173  if (msit == modelServers.end())
174  edm::LogWarning("PreferredServer") << "Preferred server " << preferred << " for model " << model
175  << " not available, will choose another server";
176  }
177  const auto& serverName(msit == modelServers.end() ? *modelServers.begin() : preferred);
178 
179  //todo: use some algorithm to select server rather than just picking arbitrarily
180  const auto& serverInfo(servers_.find(serverName)->second);
181  bool isFallbackCPU = serverInfo.isFallback and !fallbackOpts_.useGPU;
182  return std::make_pair(serverInfo.url, isFallbackCPU);
183 }
184 
186  //only need fallback if there are unserved models
187  if (!fallbackOpts_.enable or unservedModels_.empty())
188  return;
189 
191  if (verbose_)
192  msg = "List of models for fallback server: ";
193  //all unserved models are provided by fallback server
194  auto& serverInfo(servers_.find(Server::fallbackName)->second);
195  for (const auto& [modelName, model] : unservedModels_) {
196  auto& modelInfo(models_.emplace(modelName, model).first->second);
197  modelInfo.servers.insert(Server::fallbackName);
198  serverInfo.models.insert(modelName);
199  if (verbose_)
200  msg += modelName + ", ";
201  }
202  if (verbose_)
203  edm::LogInfo("TritonService") << msg;
204 
205  //randomize instance name
206  if (fallbackOpts_.instanceName.empty()) {
207  fallbackOpts_.instanceName = "triton_server_instance_" + edm::createGlobalIdentifier();
208  }
209 
210  //assemble server start command
211  std::string command("cmsTriton -P -1 -p " + std::to_string(::getpid()));
212  if (fallbackOpts_.debug)
213  command += " -c";
215  command += " -v";
217  command += " -d";
218  if (fallbackOpts_.useGPU)
219  command += " -g";
220  if (!fallbackOpts_.instanceName.empty())
221  command += " -n " + fallbackOpts_.instanceName;
222  if (fallbackOpts_.retries >= 0)
223  command += " -r " + std::to_string(fallbackOpts_.retries);
224  if (fallbackOpts_.wait >= 0)
225  command += " -w " + std::to_string(fallbackOpts_.wait);
226  for (const auto& [modelName, model] : unservedModels_) {
227  command += " -m " + model.path;
228  }
229  //don't need this anymore
230  unservedModels_.clear();
231 
232  //get a random temporary directory if none specified
233  if (fallbackOpts_.tempDir.empty()) {
234  auto tmp_dir_path{std::filesystem::temp_directory_path() /= edm::createGlobalIdentifier()};
235  fallbackOpts_.tempDir = tmp_dir_path.string();
236  }
237  //special case ".": use script default (temp dir = .$instanceName)
238  if (fallbackOpts_.tempDir != ".")
239  command += " -t " + fallbackOpts_.tempDir;
240 
241  command += " start";
242 
243  if (fallbackOpts_.debug)
244  edm::LogInfo("TritonService") << "Fallback server temporary directory: " << fallbackOpts_.tempDir;
245  if (verbose_)
246  edm::LogInfo("TritonService") << command;
247 
248  //mark as started before executing in case of ctrl+c while command is running
249  startedFallback_ = true;
250  const auto& [output, rv] = execSys(command);
251  if (verbose_ or rv != 0)
252  edm::LogInfo("TritonService") << output;
253  if (rv != 0)
254  throw cms::Exception("FallbackFailed") << "Starting the fallback server failed with exit code " << rv;
255 
256  //get the port
257  const std::string& portIndicator("CMS_TRITON_GRPC_PORT: ");
258  //find last instance in log in case multiple ports were tried
259  auto pos = output.rfind(portIndicator);
260  if (pos != std::string::npos) {
261  auto pos2 = pos + portIndicator.size();
262  auto pos3 = output.find('\n', pos2);
263  const auto& portNum = output.substr(pos2, pos3 - pos2);
264  serverInfo.url += ":" + portNum;
265  } else
266  throw cms::Exception("FallbackFailed") << "Unknown port for fallback server, log follows:\n" << output;
267 }
268 
271  desc.addUntracked<bool>("verbose", false);
272 
274  validator.addUntracked<std::string>("name");
275  validator.addUntracked<std::string>("address");
276  validator.addUntracked<unsigned>("port");
277 
278  desc.addVPSetUntracked("servers", validator, {});
279 
280  edm::ParameterSetDescription fallbackDesc;
281  fallbackDesc.addUntracked<bool>("enable", false);
282  fallbackDesc.addUntracked<bool>("debug", false);
283  fallbackDesc.addUntracked<bool>("verbose", false);
284  fallbackDesc.addUntracked<bool>("useDocker", false);
285  fallbackDesc.addUntracked<bool>("useGPU", false);
286  fallbackDesc.addUntracked<int>("retries", -1);
287  fallbackDesc.addUntracked<int>("wait", -1);
288  fallbackDesc.addUntracked<std::string>("instanceName", "");
289  fallbackDesc.addUntracked<std::string>("tempDir", "");
290  desc.add<edm::ParameterSetDescription>("fallback", fallbackDesc);
291 
292  descriptions.addWithDefaultLabel(desc);
293 }
TritonService::startedFallback_
bool startedFallback_
Definition: TritonService.h:96
ConfigurationDescriptions.h
mps_setup.cmd
list cmd
Definition: mps_setup.py:244
nvidia::inferenceserver
Definition: TritonData.cc:14
edm::ActivityRegistry::watchPreModuleDestruction
void watchPreModuleDestruction(PreModuleDestruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:715
electrons_cff.bool
bool
Definition: electrons_cff.py:366
TritonService::serverAddress
std::pair< std::string, bool > serverAddress(const std::string &model, const std::string &preferred="") const
Definition: TritonService.cc:161
MessageLogger.h
funct::false
false
Definition: Factorize.h:29
TritonService::FallbackOpts::verbose
bool verbose
Definition: TritonService.h:39
TritonService::fallbackOpts_
FallbackOpts fallbackOpts_
Definition: TritonService.h:93
TritonService::allowAddModel_
bool allowAddModel_
Definition: TritonService.h:95
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
edm::ProcessContext
Definition: ProcessContext.h:27
TritonService::Server::fallbackAddress
static const std::string fallbackAddress
Definition: TritonService.h:59
pos
Definition: PixelAliasList.h:18
TritonService::models_
std::unordered_map< std::string, Model > models_
Definition: TritonService.h:100
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
TritonService::currentModuleId_
unsigned currentModuleId_
Definition: TritonService.h:94
if
if(0==first)
Definition: CAHitNtupletGeneratorKernelsImpl.h:48
TritonService::modules_
std::unordered_map< unsigned, Module > modules_
Definition: TritonService.h:101
mps_check.msg
tuple msg
Definition: mps_check.py:285
TritonService::preModuleDestruction
void preModuleDestruction(edm::ModuleDescription const &)
Definition: TritonService.cc:140
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
TritonService::FallbackOpts::debug
bool debug
Definition: TritonService.h:38
mps_check.command
list command
Definition: mps_check.py:25
ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.model
model
Definition: ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.py:11
edm::ModuleDescription
Definition: ModuleDescription.h:21
edmScanValgrind.buffer
buffer
Definition: edmScanValgrind.py:171
ModuleDescription.h
ActivityRegistry.h
TritonService::preModuleConstruction
void preModuleConstruction(edm::ModuleDescription const &)
Definition: TritonService.cc:119
HLTEgPhaseIITestSequence_cff.modelName
modelName
Definition: HLTEgPhaseIITestSequence_cff.py:16
triton_utils::throwIfError
void throwIfError(const Error &err, std::string_view msg)
Definition: triton_utils.cc:20
TritonService::unservedModels_
std::unordered_map< std::string, Model > unservedModels_
Definition: TritonService.h:97
HLTObjectMonitor_Client_cff.client
client
Definition: HLTObjectMonitor_Client_cff.py:6
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
TritonService::Server::fallbackName
static const std::string fallbackName
Definition: TritonService.h:58
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
ParameterSetDescription.h
edm::ActivityRegistry::watchPostModuleConstruction
void watchPostModuleConstruction(PostModuleConstruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:706
TritonService.h
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
pipe
Definition: pipe.py:1
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
edm::ParameterSet
Definition: ParameterSet.h:47
TritonService::addModel
void addModel(const std::string &modelName, const std::string &path)
Definition: TritonService.cc:124
TritonService::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: TritonService.cc:185
TritonService::FallbackOpts::instanceName
std::string instanceName
Definition: TritonService.h:44
GlobalIdentifier.h
edm::ActivityRegistry::watchPreModuleConstruction
void watchPreModuleConstruction(PreModuleConstruction::slot_type const &iSlot)
Definition: ActivityRegistry.h:697
edm::createGlobalIdentifier
std::string createGlobalIdentifier(bool binary=false)
Definition: GlobalIdentifier.cc:5
TritonService::TritonService
TritonService(const edm::ParameterSet &pset, edm::ActivityRegistry &areg)
Definition: TritonService.cc:56
TritonService::FallbackOpts::wait
int wait
Definition: TritonService.h:43
TritonService::FallbackOpts::useGPU
bool useGPU
Definition: TritonService.h:41
TritonService::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: TritonService.cc:269
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
tier0.unique
def unique(seq, keepstr=True)
Definition: tier0.py:24
triton_utils.h
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
Exception
Definition: hltDiff.cc:245
or
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Exception.h
TritonService::postModuleConstruction
void postModuleConstruction(edm::ModuleDescription const &)
Definition: TritonService.cc:138
TritonService::FallbackOpts::useDocker
bool useDocker
Definition: TritonService.h:40
mps_fire.result
result
Definition: mps_fire.py:311
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
TritonService::FallbackOpts::enable
bool enable
Definition: TritonService.h:37
pipe.pipe
def pipe(cmdline, input=None)
Definition: pipe.py:5
TritonService::FallbackOpts::tempDir
std::string tempDir
Definition: TritonService.h:45
ProcessContext.h
edm::Log
Definition: MessageLogger.h:70
TritonService::FallbackOpts::retries
int retries
Definition: TritonService.h:42
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::ConfigurationDescriptions::addWithDefaultLabel
void addWithDefaultLabel(ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:87
TritonService::verbose_
bool verbose_
Definition: TritonService.h:92
TritonService::servers_
std::unordered_map< std::string, Server > servers_
Definition: TritonService.h:99