11 #include "grpc_client.h" 12 #include "grpc_service.pb.h" 13 #include "model_config.pb.h" 15 #include "google/protobuf/text_format.h" 16 #include "google/protobuf/io/zero_copy_stream_impl.h" 21 #include <experimental/iterator> 31 grpc_compression_algorithm getCompressionAlgo(
const std::string&
name) {
33 return grpc_compression_algorithm::GRPC_COMPRESS_NONE;
34 else if (
name.compare(
"deflate") == 0)
35 return grpc_compression_algorithm::GRPC_COMPRESS_DEFLATE;
36 else if (
name.compare(
"gzip") == 0)
37 return grpc_compression_algorithm::GRPC_COMPRESS_GZIP;
40 <<
"Unknown compression algorithm requested: " <<
name <<
" (choices: none, deflate, gzip)";
43 std::vector<std::shared_ptr<tc::InferResult>> convertToShared(
const std::vector<tc::InferResult*>&
tmp) {
44 std::vector<std::shared_ptr<tc::InferResult>>
results;
47 return std::shared_ptr<tc::InferResult>(ptr);
59 manualBatchMode_(
false),
60 verbose_(
params.getUntrackedParameter<
bool>(
"verbose")),
61 useSharedMemory_(
params.getUntrackedParameter<
bool>(
"useSharedMemory")),
62 compressionAlgo_(getCompressionAlgo(
params.getUntrackedParameter<
std::
string>(
"compression"))) {
79 "TritonClient(): unable to create inference context",
84 options_[0].client_timeout_ =
params.getUntrackedParameter<
unsigned>(
"timeout");
86 const auto& timeoutUnit =
params.getUntrackedParameter<
std::string>(
"timeoutUnit");
88 if (timeoutUnit ==
"seconds")
90 else if (timeoutUnit ==
"milliseconds")
92 else if (timeoutUnit ==
"microseconds")
95 throw cms::Exception(
"Configuration") <<
"Unknown timeout unit: " << timeoutUnit;
99 inference::ModelConfig localModelConfig;
102 int fileDescriptor = open(localModelConfigPath.c_str(), O_RDONLY);
103 if (fileDescriptor < 0)
105 <<
"TritonClient(): unable to open local model config: " << localModelConfigPath;
106 google::protobuf::io::FileInputStream localModelConfigInput(fileDescriptor);
107 localModelConfigInput.SetCloseOnDelete(
true);
108 if (!google::protobuf::TextFormat::Parse(&localModelConfigInput, &localModelConfig))
110 <<
"TritonClient(): unable to parse local model config: " << localModelConfigPath;
124 inference::ModelConfigResponse modelConfigResponse;
126 "TritonClient(): unable to get model config",
128 inference::ModelConfig remoteModelConfig(modelConfigResponse.config());
130 std::map<std::string, std::array<std::string, 2>> checksums;
131 size_t fileCounter = 0;
132 for (
const auto& modelConfig : {localModelConfig, remoteModelConfig}) {
133 const auto& agents = modelConfig.model_repository_agents().agents();
134 auto agent = std::find_if(agents.begin(), agents.end(), [](
auto const&
a) {
return a.name() ==
"checksum"; });
135 if (agent != agents.end()) {
136 const auto&
params = agent->parameters();
139 if (
key.compare(0,
options_[0].model_version_.size() + 1,
options_[0].model_version_ +
"/") == 0)
140 checksums[
key][fileCounter] =
val;
145 std::vector<std::string> incorrect;
146 for (
const auto& [
key,
val] : checksums) {
147 if (checksums[
key][0] != checksums[
key][1])
148 incorrect.push_back(
key);
150 if (!incorrect.empty())
151 throw TritonException(
"ModelVersioning") <<
"The following files have incorrect checksums on the remote server: " 155 inference::ModelMetadataResponse modelMetadata;
157 "TritonClient(): unable to get model metadata",
161 const auto& nicInputs = modelMetadata.inputs();
162 const auto& nicOutputs = modelMetadata.outputs();
165 std::stringstream
msg;
169 if (nicInputs.empty())
170 msg <<
"Model on server appears malformed (zero inputs)\n";
172 if (nicOutputs.empty())
173 msg <<
"Model on server appears malformed (zero outputs)\n";
177 if (!msg_str.empty())
181 std::stringstream io_msg;
183 io_msg <<
"Model inputs: " 185 for (
const auto& nicInput : nicInputs) {
186 const auto& iname = nicInput.name();
187 auto [curr_itr,
success] =
input_.emplace(std::piecewise_construct,
188 std::forward_as_tuple(iname),
189 std::forward_as_tuple(iname, nicInput,
this, ts->
pid()));
190 auto& curr_input = curr_itr->second;
192 io_msg <<
" " << iname <<
" (" << curr_input.dname() <<
", " << curr_input.byteSize()
198 const auto& v_outputs =
params.getUntrackedParameter<std::vector<std::string>>(
"outputs");
199 std::unordered_set s_outputs(v_outputs.begin(), v_outputs.end());
203 io_msg <<
"Model outputs: " 205 for (
const auto& nicOutput : nicOutputs) {
206 const auto&
oname = nicOutput.name();
207 if (!s_outputs.empty() and s_outputs.find(
oname) == s_outputs.end())
209 auto [curr_itr,
success] =
output_.emplace(std::piecewise_construct,
210 std::forward_as_tuple(
oname),
211 std::forward_as_tuple(
oname, nicOutput,
this, ts->
pid()));
212 auto& curr_output = curr_itr->second;
214 io_msg <<
" " <<
oname <<
" (" << curr_output.dname() <<
", " << curr_output.byteSize()
217 if (!s_outputs.empty())
218 s_outputs.erase(
oname);
222 if (!s_outputs.empty())
227 std::stringstream model_msg;
229 model_msg <<
"Model name: " <<
options_[0].model_name_ <<
"\n" 230 <<
"Model version: " <<
options_[0].model_version_ <<
"\n" 267 <<
"Requested batch size " << bsize <<
" exceeds server-specified max batch size " <<
maxOuterDim_ <<
".";
287 for (
auto& element :
input_) {
288 element.second.entries_.resize(
entry);
290 for (
auto& element :
output_) {
291 element.second.entries_.resize(
entry);
297 for (
auto& element :
input_) {
298 element.second.addEntryImpl(
entry);
300 for (
auto& element :
output_) {
301 element.second.addEntryImpl(
entry);
312 for (
auto& element :
input_) {
313 element.second.reset();
315 for (
auto& element :
output_) {
316 element.second.reset();
320 template <
typename F>
329 e.convertToWarning();
335 finish(
false, std::current_exception());
341 for (
unsigned i = 0;
i <
results.size(); ++
i) {
345 if (
output.variableDims()) {
346 std::vector<int64_t> tmp_shape;
348 result->Shape(
oname, &tmp_shape),
"getResults(): unable to get output shape for " +
oname,
false);
350 tmp_shape.erase(tmp_shape.begin());
373 std::vector<std::shared_ptr<tc::InferResult>> empty_results;
382 std::vector<std::vector<triton::client::InferInput*>> inputsTriton(nEntriesVal);
383 for (
auto& inputTriton : inputsTriton) {
384 inputTriton.reserve(
input_.size());
387 for (
unsigned i = 0;
i < nEntriesVal; ++
i) {
388 inputsTriton[
i].push_back(
input.data(
i));
393 std::vector<std::vector<const triton::client::InferRequestedOutput*>> outputsTriton(nEntriesVal);
394 for (
auto& outputTriton : outputsTriton) {
395 outputTriton.reserve(
output_.size());
398 for (
unsigned i = 0;
i < nEntriesVal; ++
i) {
399 outputsTriton[
i].push_back(
output.data(
i));
405 for (
auto& element :
output_) {
406 element.second.prepare();
413 inference::ModelStatistics start_status;
425 [start_status,
this](std::vector<tc::InferResult*> resultsTmp) {
427 const auto& results = convertToShared(resultsTmp);
429 for (auto ptr : results) {
430 auto success = handle_exception([&]() {
431 TRITON_THROW_IF_ERROR(
432 ptr->RequestStatus(),
"evaluate(): unable to get result(s)", isLocal_);
439 inference::ModelStatistics end_status;
461 "evaluate(): unable to launch async run",
468 std::vector<tc::InferResult*> resultsTmp;
469 success = handle_exception([&]() {
471 client_->InferMulti(&resultsTmp, options_, inputsTriton, outputsTriton, headers_, compressionAlgo_),
472 "evaluate(): unable to run and/or get result",
476 const auto&
results = convertToShared(resultsTmp);
481 inference::ModelStatistics end_status;
482 success = handle_exception([&]() { end_status = getServerSideStatus(); });
486 const auto&
stats = summarizeServerStats(start_status, end_status);
487 reportServerSideStats(
stats);
499 std::stringstream
msg;
503 msg <<
" Inference count: " <<
stats.inference_count_ <<
"\n";
504 msg <<
" Execution count: " <<
stats.execution_count_ <<
"\n";
505 msg <<
" Successful request count: " <<
count <<
"\n";
510 return tval / us_to_ns /
count;
513 const uint64_t cumm_avg_us = get_avg_us(
stats.cumm_time_ns_);
514 const uint64_t queue_avg_us = get_avg_us(
stats.queue_time_ns_);
515 const uint64_t compute_input_avg_us = get_avg_us(
stats.compute_input_time_ns_);
516 const uint64_t compute_infer_avg_us = get_avg_us(
stats.compute_infer_time_ns_);
517 const uint64_t compute_output_avg_us = get_avg_us(
stats.compute_output_time_ns_);
518 const uint64_t compute_avg_us = compute_input_avg_us + compute_infer_avg_us + compute_output_avg_us;
520 (cumm_avg_us > queue_avg_us + compute_avg_us) ? (cumm_avg_us - queue_avg_us - compute_avg_us) : 0;
522 msg <<
" Avg request latency: " << cumm_avg_us <<
" usec" 524 <<
" (overhead " << overhead <<
" usec + " 525 <<
"queue " << queue_avg_us <<
" usec + " 526 <<
"compute input " << compute_input_avg_us <<
" usec + " 527 <<
"compute infer " << compute_infer_avg_us <<
" usec + " 528 <<
"compute output " << compute_output_avg_us <<
" usec)" << std::endl;
536 const inference::ModelStatistics& end_status)
const {
539 server_stats.
inference_count_ = end_status.inference_count() - start_status.inference_count();
540 server_stats.
execution_count_ = end_status.execution_count() - start_status.execution_count();
542 end_status.inference_stats().success().count() - start_status.inference_stats().success().count();
544 end_status.inference_stats().success().ns() - start_status.inference_stats().success().ns();
545 server_stats.
queue_time_ns_ = end_status.inference_stats().queue().ns() - start_status.inference_stats().queue().ns();
547 end_status.inference_stats().compute_input().ns() - start_status.inference_stats().compute_input().ns();
549 end_status.inference_stats().compute_infer().ns() - start_status.inference_stats().compute_infer().ns();
551 end_status.inference_stats().compute_output().ns() - start_status.inference_stats().compute_output().ns();
558 inference::ModelStatisticsResponse resp;
560 "getServerSideStatus(): unable to get model statistics",
562 return *(resp.model_stats().begin());
564 return inference::ModelStatistics{};
578 edm::allowedValues<std::string>(
"seconds",
"milliseconds",
"microseconds"));
581 descClient.
addUntracked<std::vector<std::string>>(
"outputs", {});
uint64_t execution_count_
ParameterDescriptionNode * ifValue(ParameterDescription< T > const &switchParameter, std::unique_ptr< ParameterDescriptionCases< T >> cases)
void getResults(const std::vector< std::shared_ptr< triton::client::InferResult >> &results)
const std::string & pid() const
bool setBatchSize(unsigned bsize)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void notifyCallStatus(bool status) const
void addEntry(unsigned entry)
TritonBatchMode batchMode() const
void setMode(SonicMode mode)
std::unique_ptr< triton::client::InferenceServerGrpcClient > client_
TritonClient(const edm::ParameterSet ¶ms, const std::string &debugName)
uint64_t compute_infer_time_ns_
uint64_t inference_count_
void finish(bool success, std::exception_ptr eptr=std::exception_ptr{})
ServerSideStats summarizeServerStats(const inference::ModelStatistics &start_status, const inference::ModelStatistics &end_status) const
TritonServerType serverType_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
bool handle_exception(F &&call)
key
prepare the HTCondor submission files and eventually submit them
grpc_compression_algorithm compressionAlgo_
#define TRITON_THROW_IF_ERROR(X, MSG, NOTIFY)
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillBasePSetDescription(edm::ParameterSetDescription &desc, bool allowRetry=true)
void resizeEntries(unsigned entry)
inference::ModelStatistics getServerSideStatus() const
Log< level::Info, false > LogInfo
triton::client::Headers headers_
unsigned nEntries() const
void conversion(EventAux const &from, EventAuxiliary &to)
unsigned long long uint64_t
unsigned batchSize() const
void setBatchMode(TritonBatchMode batchMode)
Server serverInfo(const std::string &model, const std::string &preferred="") const
void reportServerSideStats(const ServerSideStats &stats) const
std::string fullDebugName_
uint64_t compute_output_time_ns_
const std::string & fullPath() const
static void fillPSetDescription(edm::ParameterSetDescription &iDesc)
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
TritonBatchMode batchMode_
std::string printColl(const C &coll, const std::string &delim=", ")
std::vector< triton::client::InferOptions > options_
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
uint64_t compute_input_time_ns_