10 #include "grpc_client.h" 11 #include "grpc_service.pb.h" 12 #include "model_config.pb.h" 14 #include "google/protobuf/text_format.h" 15 #include "google/protobuf/io/zero_copy_stream_impl.h" 20 #include <experimental/iterator> 30 grpc_compression_algorithm getCompressionAlgo(
const std::string&
name) {
32 return grpc_compression_algorithm::GRPC_COMPRESS_NONE;
33 else if (
name.compare(
"deflate") == 0)
34 return grpc_compression_algorithm::GRPC_COMPRESS_DEFLATE;
35 else if (
name.compare(
"gzip") == 0)
36 return grpc_compression_algorithm::GRPC_COMPRESS_GZIP;
39 <<
"Unknown compression algorithm requested: " <<
name <<
" (choices: none, deflate, gzip)";
42 std::vector<std::shared_ptr<tc::InferResult>> convertToShared(
const std::vector<tc::InferResult*>&
tmp) {
43 std::vector<std::shared_ptr<tc::InferResult>>
results;
46 return std::shared_ptr<tc::InferResult>(ptr);
58 manualBatchMode_(
false),
59 verbose_(
params.getUntrackedParameter<
bool>(
"verbose")),
60 useSharedMemory_(
params.getUntrackedParameter<
bool>(
"useSharedMemory")),
61 compressionAlgo_(getCompressionAlgo(
params.getUntrackedParameter<
std::
string>(
"compression"))) {
78 "TritonClient(): unable to create inference context");
83 options_[0].client_timeout_ =
params.getUntrackedParameter<
unsigned>(
"timeout") * 1e6;
86 inference::ModelConfig localModelConfig;
89 int fileDescriptor = open(localModelConfigPath.c_str(), O_RDONLY);
90 if (fileDescriptor < 0)
92 <<
"TritonClient(): unable to open local model config: " << localModelConfigPath;
93 google::protobuf::io::FileInputStream localModelConfigInput(fileDescriptor);
94 localModelConfigInput.SetCloseOnDelete(
true);
95 if (!google::protobuf::TextFormat::Parse(&localModelConfigInput, &localModelConfig))
97 <<
"TritonClient(): unable to parse local model config: " << localModelConfigPath;
111 inference::ModelConfigResponse modelConfigResponse;
113 "TritonClient(): unable to get model config");
114 inference::ModelConfig remoteModelConfig(modelConfigResponse.config());
116 std::map<std::string, std::array<std::string, 2>> checksums;
117 size_t fileCounter = 0;
118 for (
const auto& modelConfig : {localModelConfig, remoteModelConfig}) {
119 const auto& agents = modelConfig.model_repository_agents().agents();
120 auto agent = std::find_if(agents.begin(), agents.end(), [](
auto const&
a) {
return a.name() ==
"checksum"; });
121 if (agent != agents.end()) {
122 const auto&
params = agent->parameters();
125 if (
key.compare(0,
options_[0].model_version_.size() + 1,
options_[0].model_version_ +
"/") == 0)
126 checksums[
key][fileCounter] =
val;
131 std::vector<std::string> incorrect;
132 for (
const auto& [
key,
val] : checksums) {
133 if (checksums[
key][0] != checksums[
key][1])
134 incorrect.push_back(
key);
136 if (!incorrect.empty())
137 throw TritonException(
"ModelVersioning") <<
"The following files have incorrect checksums on the remote server: " 141 inference::ModelMetadataResponse modelMetadata;
143 "TritonClient(): unable to get model metadata");
146 const auto& nicInputs = modelMetadata.inputs();
147 const auto& nicOutputs = modelMetadata.outputs();
150 std::stringstream
msg;
154 if (nicInputs.empty())
155 msg <<
"Model on server appears malformed (zero inputs)\n";
157 if (nicOutputs.empty())
158 msg <<
"Model on server appears malformed (zero outputs)\n";
162 if (!msg_str.empty())
166 std::stringstream io_msg;
168 io_msg <<
"Model inputs: " 170 for (
const auto& nicInput : nicInputs) {
171 const auto& iname = nicInput.name();
172 auto [curr_itr,
success] =
input_.emplace(std::piecewise_construct,
173 std::forward_as_tuple(iname),
174 std::forward_as_tuple(iname, nicInput,
this, ts->
pid()));
175 auto& curr_input = curr_itr->second;
177 io_msg <<
" " << iname <<
" (" << curr_input.dname() <<
", " << curr_input.byteSize()
183 const auto& v_outputs =
params.getUntrackedParameter<std::vector<std::string>>(
"outputs");
184 std::unordered_set s_outputs(v_outputs.begin(), v_outputs.end());
188 io_msg <<
"Model outputs: " 190 for (
const auto& nicOutput : nicOutputs) {
191 const auto&
oname = nicOutput.name();
192 if (!s_outputs.empty() and s_outputs.find(
oname) == s_outputs.end())
194 auto [curr_itr,
success] =
output_.emplace(std::piecewise_construct,
195 std::forward_as_tuple(
oname),
196 std::forward_as_tuple(
oname, nicOutput,
this, ts->
pid()));
197 auto& curr_output = curr_itr->second;
199 io_msg <<
" " <<
oname <<
" (" << curr_output.dname() <<
", " << curr_output.byteSize()
202 if (!s_outputs.empty())
203 s_outputs.erase(
oname);
207 if (!s_outputs.empty())
212 std::stringstream model_msg;
214 model_msg <<
"Model name: " <<
options_[0].model_name_ <<
"\n" 215 <<
"Model version: " <<
options_[0].model_version_ <<
"\n" 272 for (
auto& element :
input_) {
273 element.second.entries_.resize(
entry);
275 for (
auto& element :
output_) {
276 element.second.entries_.resize(
entry);
282 for (
auto& element :
input_) {
283 element.second.addEntryImpl(
entry);
285 for (
auto& element :
output_) {
286 element.second.addEntryImpl(
entry);
297 for (
auto& element :
input_) {
298 element.second.reset();
300 for (
auto& element :
output_) {
301 element.second.reset();
305 template <
typename F>
314 e.convertToWarning();
320 finish(
false, std::current_exception());
326 for (
unsigned i = 0;
i <
results.size(); ++
i) {
330 if (
output.variableDims()) {
331 std::vector<int64_t> tmp_shape;
333 "getResults(): unable to get output shape for " +
oname);
335 tmp_shape.erase(tmp_shape.begin());
352 std::vector<std::shared_ptr<tc::InferResult>> empty_results;
361 std::vector<std::vector<triton::client::InferInput*>> inputsTriton(nEntriesVal);
362 for (
auto& inputTriton : inputsTriton) {
363 inputTriton.reserve(
input_.size());
366 for (
unsigned i = 0;
i < nEntriesVal; ++
i) {
367 inputsTriton[
i].push_back(
input.data(
i));
372 std::vector<std::vector<const triton::client::InferRequestedOutput*>> outputsTriton(nEntriesVal);
373 for (
auto& outputTriton : outputsTriton) {
374 outputTriton.reserve(
output_.size());
377 for (
unsigned i = 0;
i < nEntriesVal; ++
i) {
378 outputsTriton[
i].push_back(
output.data(
i));
384 for (
auto& element :
output_) {
385 element.second.prepare();
392 inference::ModelStatistics start_status;
405 [start_status,
this](std::vector<tc::InferResult*> resultsTmp) {
407 const auto& results = convertToShared(resultsTmp);
409 for (auto ptr : results) {
410 auto success = handle_exception(
411 [&]() { TRITON_THROW_IF_ERROR(ptr->RequestStatus(),
"evaluate(): unable to get result(s)"); });
417 inference::ModelStatistics end_status;
439 "evaluate(): unable to launch async run");
445 std::vector<tc::InferResult*> resultsTmp;
446 success = handle_exception([&]() {
448 client_->InferMulti(&resultsTmp, options_, inputsTriton, outputsTriton, headers_, compressionAlgo_),
449 "evaluate(): unable to run and/or get result");
452 const auto&
results = convertToShared(resultsTmp);
457 inference::ModelStatistics end_status;
458 success = handle_exception([&]() { end_status = getServerSideStatus(); });
462 const auto&
stats = summarizeServerStats(start_status, end_status);
463 reportServerSideStats(
stats);
475 std::stringstream
msg;
479 msg <<
" Inference count: " <<
stats.inference_count_ <<
"\n";
480 msg <<
" Execution count: " <<
stats.execution_count_ <<
"\n";
481 msg <<
" Successful request count: " <<
count <<
"\n";
486 return tval / us_to_ns /
count;
489 const uint64_t cumm_avg_us = get_avg_us(
stats.cumm_time_ns_);
490 const uint64_t queue_avg_us = get_avg_us(
stats.queue_time_ns_);
491 const uint64_t compute_input_avg_us = get_avg_us(
stats.compute_input_time_ns_);
492 const uint64_t compute_infer_avg_us = get_avg_us(
stats.compute_infer_time_ns_);
493 const uint64_t compute_output_avg_us = get_avg_us(
stats.compute_output_time_ns_);
494 const uint64_t compute_avg_us = compute_input_avg_us + compute_infer_avg_us + compute_output_avg_us;
496 (cumm_avg_us > queue_avg_us + compute_avg_us) ? (cumm_avg_us - queue_avg_us - compute_avg_us) : 0;
498 msg <<
" Avg request latency: " << cumm_avg_us <<
" usec" 500 <<
" (overhead " << overhead <<
" usec + " 501 <<
"queue " << queue_avg_us <<
" usec + " 502 <<
"compute input " << compute_input_avg_us <<
" usec + " 503 <<
"compute infer " << compute_infer_avg_us <<
" usec + " 504 <<
"compute output " << compute_output_avg_us <<
" usec)" << std::endl;
512 const inference::ModelStatistics& end_status)
const {
515 server_stats.
inference_count_ = end_status.inference_count() - start_status.inference_count();
516 server_stats.
execution_count_ = end_status.execution_count() - start_status.execution_count();
518 end_status.inference_stats().success().count() - start_status.inference_stats().success().count();
520 end_status.inference_stats().success().ns() - start_status.inference_stats().success().ns();
521 server_stats.
queue_time_ns_ = end_status.inference_stats().queue().ns() - start_status.inference_stats().queue().ns();
523 end_status.inference_stats().compute_input().ns() - start_status.inference_stats().compute_input().ns();
525 end_status.inference_stats().compute_infer().ns() - start_status.inference_stats().compute_infer().ns();
527 end_status.inference_stats().compute_output().ns() - start_status.inference_stats().compute_output().ns();
534 inference::ModelStatisticsResponse resp;
536 "getServerSideStatus(): unable to get model statistics");
537 return *(resp.model_stats().begin());
539 return inference::ModelStatistics{};
554 descClient.
addUntracked<std::vector<std::string>>(
"outputs", {});
uint64_t execution_count_
void getResults(const std::vector< std::shared_ptr< triton::client::InferResult >> &results)
const std::string & pid() const
bool setBatchSize(unsigned bsize)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string fullPath() const
void addEntry(unsigned entry)
#define TRITON_THROW_IF_ERROR(X, MSG)
TritonBatchMode batchMode() const
void setMode(SonicMode mode)
std::unique_ptr< triton::client::InferenceServerGrpcClient > client_
TritonClient(const edm::ParameterSet ¶ms, const std::string &debugName)
uint64_t compute_infer_time_ns_
uint64_t inference_count_
void finish(bool success, std::exception_ptr eptr=std::exception_ptr{})
ServerSideStats summarizeServerStats(const inference::ModelStatistics &start_status, const inference::ModelStatistics &end_status) const
TritonServerType serverType_
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
bool handle_exception(F &&call)
key
prepare the HTCondor submission files and eventually submit them
grpc_compression_algorithm compressionAlgo_
ParameterDescriptionBase * add(U const &iLabel, T const &value)
static void fillBasePSetDescription(edm::ParameterSetDescription &desc, bool allowRetry=true)
void resizeEntries(unsigned entry)
inference::ModelStatistics getServerSideStatus() const
Log< level::Info, false > LogInfo
triton::client::Headers headers_
unsigned nEntries() const
unsigned long long uint64_t
unsigned batchSize() const
void setBatchMode(TritonBatchMode batchMode)
Server serverInfo(const std::string &model, const std::string &preferred="") const
void reportServerSideStats(const ServerSideStats &stats) const
std::string fullDebugName_
uint64_t compute_output_time_ns_
static void fillPSetDescription(edm::ParameterSetDescription &iDesc)
Log< level::Warning, false > LogWarning
static uInt32 F(BLOWFISH_CTX *ctx, uInt32 x)
TritonBatchMode batchMode_
std::string printColl(const C &coll, const std::string &delim=", ")
std::vector< triton::client::InferOptions > options_
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
uint64_t compute_input_time_ns_