CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | Friends
tensorflow::NTSession Class Reference

#include <NTSession.h>

Inheritance diagram for tensorflow::NTSession:
Session

Classes

struct  ExecutorsAndKeys
 
struct  FunctionInfo
 
struct  PerPartitionExecutorsAndLib
 
struct  RunState
 
struct  RunStateArgs
 

Public Types

typedef std::function< void(Session *)> CloseCallback
 
typedef std::vector< std::pair< string, Tensor > > NamedTensorList
 
typedef std::unordered_map< StringPiece, Node *, StringPieceHasher > NameNodeMap
 

Public Member Functions

::tensorflow::Status Close () override
 
::tensorflow::Status Create (const GraphDef &graph) override
 
void ExportCostModels (CostModelManager::CostModelMap *cost_models)
 
::tensorflow::Status Extend (const GraphDef &graph) override
 
::tensorflow::Status ListDevices (std::vector< DeviceAttributes > *response) override
 
::tensorflow::Status LocalDeviceManager (const DeviceMgr **output) override
 
 NTSession (const SessionOptions &options, const DeviceMgr *device_mgr, NTSessionFactory *factory)
 
::tensorflow::Status PRun (const string &handle, const NamedTensorList &inputs, const std::vector< string > &output_names, std::vector< Tensor > *outputs) override
 
::tensorflow::Status PRunSetup (const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_nodes, string *handle) override
 
::tensorflow::Status Reset (const std::vector< string > &containers)
 
::tensorflow::Status Run (const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
 
::tensorflow::Status Run (const ::tensorflow::RunOptions &run_options, const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs, RunMetadata *run_metadata) override
 
 ~NTSession () override
 

Private Member Functions

::tensorflow::Status CheckFetch (const std::vector< std::pair< string, Tensor >> &feeds, const std::vector< string > &fetches, const ExecutorsAndKeys *executors_and_keys, const RunState *run_state)
 
::tensorflow::Status CheckNotClosed ()
 
::tensorflow::Status CreateDebuggerState (const DebugOptions &debug_options, int64 session_run_index, int64 executor_step_index, const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_names, std::unique_ptr< DebuggerStateInterface > *debugger_state)
 
::tensorflow::Status CreateGraphs (const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
 
::tensorflow::Status DecorateAndPublishGraphForDebug (const DebugOptions &debug_options, Graph *graph, Device *device)
 
::tensorflow::Status ExtendLocked (const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status GetOrCreateExecutors (gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
 
bool graph_created_ GUARDED_BY (graph_def_lock_)
 
GraphDef graph_def_ GUARDED_BY (graph_def_lock_)
 
std::vector< std::unique_ptr< FunctionInfo > > functions_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, std::shared_ptr< ExecutorsAndKeys > > executors_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, std::unique_ptr< RunState > > partial_runs_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, string > stateful_placements_ GUARDED_BY (graph_def_lock_)
 
std::unique_ptr< GraphExecutionState > execution_state_ GUARDED_BY (graph_def_lock_)
 
bool closed_ GUARDED_BY (closed_lock_)
 
Status MaybeInitializeExecutionState (const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status RecvPRunOutputs (const std::vector< string > &output_names, const ExecutorsAndKeys *executors_and_keys, RunState *run_state, std::vector< Tensor > *outputs)
 
::tensorflow::Status ResourceHandleToInputTensor (const Tensor &resource_tensor, Tensor *retrieved_tensor)
 
void SchedClosure (std::function< void()> c)
 
::tensorflow::Status SendPRunInputs (const std::vector< std::pair< string, Tensor >> &inputs, const ExecutorsAndKeys *executors_and_keys, IntraProcessRendezvous *rendez)
 
 TF_DISALLOW_COPY_AND_ASSIGN (NTSession)
 
::tensorflow::Status WaitForNotification (Notification *n, int64 timeout_in_ms)
 
void WaitForNotification (RunState *run_state, CancellationManager *cm, int64 timeout_in_ms)
 

Private Attributes

CancellationManager * cancellation_manager_
 
mutex closed_lock_
 
CostModelManager cost_model_manager_
 
const std::unique_ptr< const DeviceMgr > device_mgr_
 
DeviceSet device_set_
 
std::vector< Device * > devices_
 
std::atomic< int64 > edge_name_counter_ = {0}
 
mutex executor_lock_
 
NTSessionFactory *const factory_
 
std::unique_ptr< FunctionLibraryDefinition > flib_def_
 
mutex graph_def_lock_
 
std::atomic< int64 > handle_name_counter_ = {0}
 
Status init_error_
 
Executor::Args::NodeOutputsCallback node_outputs_callback_ = nullptr
 
const int64 operation_timeout_in_ms_ = 0
 
const SessionOptions options_
 
string session_handle_
 
SessionState session_state_
 
bool sync_on_finish_ = true
 

Static Private Attributes

static std::atomic_int_fast64_t step_id_counter_
 

Friends

class DebugGateway
 

Detailed Description

Definition at line 71 of file NTSession.h.

Member Typedef Documentation

typedef std::function<void(Session*)> tensorflow::NTSession::CloseCallback

Definition at line 73 of file NTSession.h.

typedef std::vector<std::pair<string, Tensor> > tensorflow::NTSession::NamedTensorList

Definition at line 82 of file NTSession.h.

typedef std::unordered_map<StringPiece, Node*, StringPieceHasher> tensorflow::NTSession::NameNodeMap

Definition at line 83 of file NTSession.h.

Constructor & Destructor Documentation

tensorflow::NTSession::NTSession ( const SessionOptions &  options,
const DeviceMgr *  device_mgr,
NTSessionFactory factory 
)

Definition at line 191 of file NTSession.cc.

References ztail::d, device_mgr_, device_set_, devices_, dqm::qstatus::ERROR, unpackBuffers-CaloStage2::INFO, LOG, session_handle_, btagGenBb_cfi::Status, mps_update::status, and sync_on_finish_.

192  : options_(options),
193  device_mgr_(device_mgr),
194  factory_(factory),
195  cancellation_manager_(new CancellationManager()),
196  operation_timeout_in_ms_(options_.config.operation_timeout_in_ms()) {
197  // The default value of sync_on_finish will be flipped soon and this
198  // environment variable will be removed as well.
199  const Status status = ReadBoolFromEnvVar("TF_SYNC_ON_FINISH", true, &sync_on_finish_);
200  if (!status.ok()) {
201  LOG(ERROR) << status.error_message();
202  }
203  // NOTE(mrry): We do not need to use a unique string for the session
204  // handle, because NTSession owns its devices. This may change
205  // in future versions.
206  session_handle_ = "no_threads";
207  int devices_added = 0;
208  if (options.config.log_device_placement()) {
209  const string mapping_str = device_mgr_->DeviceMappingString();
210  if (mapping_str.empty()) {
211  printf("Device mapping: no known devices.\n");
212  } else {
213  printf("Device mapping:\n%s", mapping_str.c_str());
214  }
215  LOG(INFO) << "Device mapping:\n" << mapping_str;
216  }
217  for (auto d : device_mgr_->ListDevices()) {
218  devices_.push_back(d);
219  device_set_.AddDevice(d);
220  d->op_segment()->AddHold(session_handle_);
221 
222  // The first device added is special: it is the 'client device' (a
223  // CPU device) from which we feed and fetch Tensors.
224  if (devices_added == 0) {
225  device_set_.set_client_device(d);
226  }
227  ++devices_added;
228  }
229  }
const SessionOptions options_
Definition: NTSession.h:285
#define LOG(A)
DeviceSet device_set_
Definition: NTSession.h:290
d
Definition: ztail.py:151
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::vector< Device * > devices_
Definition: NTSession.h:289
CancellationManager * cancellation_manager_
Definition: NTSession.h:321
const int64 operation_timeout_in_ms_
Definition: NTSession.h:349
NTSessionFactory *const factory_
Definition: NTSession.h:320
static const int ERROR
tensorflow::NTSession::~NTSession ( )
override

Definition at line 231 of file NTSession.cc.

References cancellation_manager_, Close(), ztail::d, device_mgr_, flib_def_, and session_handle_.

231  {
232  if (!closed_)
233  Close().IgnoreError();
234  for (auto& it : partial_runs_) {
235  it.second.reset(nullptr);
236  }
237  for (auto& it : executors_) {
238  it.second.reset();
239  }
240  for (auto d : device_mgr_->ListDevices()) {
241  d->op_segment()->RemoveHold(session_handle_);
242  }
243  for (auto d : device_mgr_->ListDevices()) {
244  d->ClearResourceMgr();
245  }
246  functions_.clear();
247  delete cancellation_manager_;
248 
249  execution_state_.reset(nullptr);
250  flib_def_.reset(nullptr);
251  }
d
Definition: ztail.py:151
::tensorflow::Status Close() override
Definition: NTSession.cc:1325
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
CancellationManager * cancellation_manager_
Definition: NTSession.h:321
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:335

Member Function Documentation

Status tensorflow::NTSession::CheckFetch ( const std::vector< std::pair< string, Tensor >> &  feeds,
const std::vector< string > &  fetches,
const ExecutorsAndKeys executors_and_keys,
const RunState run_state 
)
private

Definition at line 854 of file NTSession.cc.

References executor_lock_, cond::persistency::fetch(), dqmdumpme::first, tensorflow::NTSession::ExecutorsAndKeys::graph, triggerObjects_cff::id, input, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, tensorflow::NTSession::ExecutorsAndKeys::name_to_node, edm::errors::NotFound, tensorflow::NTSession::RunState::pending_inputs, svgfig::stack, and class-composition::visited.

Referenced by PRun().

857  {
858  const Graph* graph = executors_and_keys->graph.get();
859  const NameNodeMap* name_to_node = &executors_and_keys->name_to_node;
860 
861  // Build the set of pending feeds that we haven't seen.
862  std::unordered_set<TensorId, TensorId::Hasher> pending_feeds;
863  {
864  mutex_lock l(executor_lock_);
865  for (const auto& input : run_state->pending_inputs) {
866  // Skip if the feed has already been fed.
867  if (input.second)
868  continue;
869  TensorId id(ParseTensorName(input.first));
870  auto it = name_to_node->find(id.first);
871  if (it == name_to_node->end()) {
872  return errors::NotFound("Feed ", input.first, ": not found");
873  }
874  pending_feeds.insert(id);
875  }
876  }
877  for (const auto& it : feeds) {
878  TensorId id(ParseTensorName(it.first));
879  pending_feeds.erase(id);
880  }
881 
882  // Initialize the stack with the fetch nodes.
883  std::vector<const Node*> stack;
884  for (const string& fetch : fetches) {
885  TensorId id(ParseTensorName(fetch));
886  auto it = name_to_node->find(id.first);
887  if (it == name_to_node->end()) {
888  return errors::NotFound("Fetch ", fetch, ": not found");
889  }
890  stack.push_back(it->second);
891  }
892 
893  // Any tensor needed for fetches can't be in pending_feeds.
894  std::vector<bool> visited(graph->num_node_ids(), false);
895  while (!stack.empty()) {
896  const Node* n = stack.back();
897  stack.pop_back();
898 
899  for (const Edge* in_edge : n->in_edges()) {
900  const Node* in_node = in_edge->src();
901  if (pending_feeds.count({in_node->name(), in_edge->src_output()}) > 0) {
902  return errors::InvalidArgument("Fetch ",
903  in_node->name(),
904  ":",
905  in_edge->src_output(),
906  " can't be computed from the feeds"
907  " that have been fed so far.");
908  }
909  if (!visited[in_node->id()]) {
910  visited[in_node->id()] = true;
911  stack.push_back(in_node);
912  }
913  }
914  }
915  return Status::OK();
916  }
TGeoNode Node
static std::string const input
Definition: EdmProvDump.cc:48
stack
Definition: svgfig.py:559
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
std::pair< std::string, std::shared_ptr< void > > fetch(const cond::Hash &payloadId, Session &session)
Definition: CondDBFetch.cc:335
std::unordered_map< StringPiece, Node *, StringPieceHasher > NameNodeMap
Definition: NTSession.h:83
::tensorflow::Status tensorflow::NTSession::CheckNotClosed ( )
inlineprivate
tensorflow::Status tensorflow::NTSession::Close ( )
override

Definition at line 1325 of file NTSession.cc.

References cancellation_manager_, closed_lock_, tensorflow::NTSessionFactory::Deregister(), factory_, and cmsLHEtoEOSManager::l.

Referenced by ~NTSession().

1325  {
1326  cancellation_manager_->StartCancel();
1327  {
1328  mutex_lock l(closed_lock_);
1329  if (closed_)
1331  closed_ = true;
1332  }
1333  if (factory_ != nullptr)
1334  factory_->Deregister(this);
1336  }
void Deregister(const NTSession *session)
Definition: NTSession.cc:155
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
CancellationManager * cancellation_manager_
Definition: NTSession.h:321
NTSessionFactory *const factory_
Definition: NTSession.h:320
Status tensorflow::NTSession::Create ( const GraphDef &  graph)
override

Definition at line 281 of file NTSession.cc.

References ExtendLocked(), graph_def_lock_, init_error_, and cmsLHEtoEOSManager::l.

281  {
282  TF_RETURN_IF_ERROR(init_error_);
283  if (graph.node_size() > 0) {
284  mutex_lock l(graph_def_lock_);
285  if (graph_created_) {
286  return errors::AlreadyExists("A Graph has already been created for this session.");
287  }
288  return ExtendLocked(graph);
289  }
290  return Status::OK();
291  }
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:299
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
Status tensorflow::NTSession::CreateDebuggerState ( const DebugOptions &  debug_options,
int64  session_run_index,
int64  executor_step_index,
const std::vector< string > &  input_names,
const std::vector< string > &  output_names,
const std::vector< string > &  target_names,
std::unique_ptr< DebuggerStateInterface > *  debugger_state 
)
private

Definition at line 321 of file NTSession.cc.

References pfDeepBoostedJetPreprocessParams_cfi::input_names.

Referenced by CheckNotClosed(), and DecorateAndPublishGraphForDebug().

327  {
328  TF_RETURN_IF_ERROR(DebuggerStateRegistry::CreateState(debug_options, debugger_state));
329  TF_RETURN_IF_ERROR(debugger_state->get()->PublishDebugMetadata(
330  debug_options.global_step(), session_run_index, executor_step_index, input_names, output_names, target_names));
331  return Status::OK();
332  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
Status tensorflow::NTSession::CreateGraphs ( const BuildGraphOptions &  options,
std::unordered_map< string, std::unique_ptr< Graph >> *  outputs,
std::unique_ptr< FunctionLibraryDefinition > *  flib_def,
RunStateArgs run_state_args,
DataTypeVector *  input_types,
DataTypeVector *  output_types 
)
private

Definition at line 1158 of file NTSession.cc.

References KineDebug3::count(), ztail::d, device_mgr_, device_set_, devices_, edge_name_counter_, flib_def_, tensorflow::NTSession::RunStateArgs::graph, graph_def_lock_, tensorflow::NTSession::RunStateArgs::is_partial_run, cmsLHEtoEOSManager::l, eostools::move(), Skims_PA_cff::name, options_, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, ZMuMuAnalysisNtupler_cff::prefix, alignCSCRings::s, btagGenBb_cfi::Status, and std::swap().

Referenced by GetOrCreateExecutors().

1163  {
1164  mutex_lock l(graph_def_lock_);
1165  std::unique_ptr<ClientGraph> client_graph;
1166 
1167  std::unique_ptr<GraphExecutionState> temp_exec_state_holder;
1168  GraphExecutionState* execution_state = nullptr;
1169  if (options_.config.graph_options().place_pruned_graph()) {
1170  // Because we are placing pruned graphs, we need to create a
1171  // new GraphExecutionState for every new unseen graph,
1172  // and then place it.
1173  GraphExecutionStateOptions prune_options;
1174  prune_options.device_set = &device_set_;
1175  prune_options.session_options = &options_;
1176  prune_options.stateful_placements = stateful_placements_;
1177  TF_RETURN_IF_ERROR(GraphExecutionState::MakeForPrunedGraph(execution_state_->original_graph_def().library(),
1178  prune_options,
1179  execution_state_->original_graph_def(),
1180  subgraph_options,
1181  &temp_exec_state_holder,
1182  &client_graph));
1183  execution_state = temp_exec_state_holder.get();
1184  } else {
1185  execution_state = execution_state_.get();
1186  TF_RETURN_IF_ERROR(execution_state->BuildGraph(subgraph_options, &client_graph));
1187  }
1188 
1189  if (subgraph_options.feed_endpoints.size() != client_graph->feed_types.size()) {
1190  return errors::Internal("Graph pruning failed: requested number of feed endpoints = ",
1191  subgraph_options.feed_endpoints.size(),
1192  " versus number of pruned feed endpoints = ",
1193  client_graph->feed_types.size());
1194  }
1195  if (subgraph_options.fetch_endpoints.size() != client_graph->fetch_types.size()) {
1196  return errors::Internal("Graph pruning failed: requested number of fetch endpoints = ",
1197  subgraph_options.fetch_endpoints.size(),
1198  " versus number of pruned fetch endpoints = ",
1199  client_graph->fetch_types.size());
1200  }
1201 
1202  auto current_stateful_placements = execution_state->GetStatefulPlacements();
1203  // Update our current state based on the execution_state's
1204  // placements. If there are any mismatches for a node,
1205  // we should fail, as this should never happen.
1206  for (auto placement_pair : current_stateful_placements) {
1207  const string& node_name = placement_pair.first;
1208  const string& placement = placement_pair.second;
1209  auto iter = stateful_placements_.find(node_name);
1210  if (iter == stateful_placements_.end()) {
1211  stateful_placements_.insert(std::make_pair(node_name, placement));
1212  } else if (iter->second != placement) {
1213  return errors::Internal(
1214  "Stateful placement mismatch. "
1215  "Current assignment of ",
1216  node_name,
1217  " to ",
1218  iter->second,
1219  " does not match ",
1220  placement);
1221  }
1222  }
1223 
1224  stateful_placements_ = execution_state->GetStatefulPlacements();
1225 
1226  // Remember the graph in run state if this is a partial run.
1227  if (run_state_args->is_partial_run) {
1228  run_state_args->graph.reset(new Graph(flib_def_.get()));
1229  CopyGraph(*execution_state->full_graph(), run_state_args->graph.get());
1230  }
1231 
1232  // Partition the graph across devices.
1233  PartitionOptions popts;
1234  popts.node_to_loc = [](const Node* node) {
1235  assert(node != nullptr);
1236  return node->assigned_device_name();
1237  };
1238  popts.new_name = [this](const string& prefix) {
1239  return strings::StrCat(prefix, "/_", edge_name_counter_.fetch_add(1));
1240  };
1241  popts.get_incarnation = [](const string& name) {
1242  // The direct session does not have changing incarnation numbers.
1243  // Just return '1'.
1244  return 1;
1245  };
1246  popts.flib_def = &client_graph->graph.flib_def();
1247  popts.control_flow_added = false;
1248 
1249  std::unordered_map<string, GraphDef> partitions;
1250  TF_RETURN_IF_ERROR(Partition(popts, &client_graph->graph, &partitions));
1251 
1252  std::vector<string> device_names;
1253  for (auto device : devices_) {
1254  // Extract the LocalName from the device.
1255  device_names.push_back(DeviceNameUtils::LocalName(device->name()));
1256  }
1257 
1258  // Check for valid partitions.
1259  for (const auto& partition : partitions) {
1260  const string local_partition_name = DeviceNameUtils::LocalName(partition.first);
1261  if (std::count(device_names.begin(), device_names.end(), local_partition_name) == 0) {
1262  return errors::InvalidArgument("Creating a partition for ",
1263  local_partition_name,
1264  " which doesn't exist in the list of available devices. Available "
1265  "devices: ",
1266  str_util::Join(device_names, ","));
1267  }
1268  }
1269 
1270  for (const auto& partition : partitions) {
1271  std::unique_ptr<Graph> device_graph(new Graph(client_graph->flib_def.get()));
1272  GraphConstructorOptions device_opts;
1273  // There are internal operations (e.g., send/recv) that we now allow.
1274  device_opts.allow_internal_ops = true;
1275  device_opts.expect_device_spec = true;
1276  TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(device_opts, partition.second, device_graph.get()));
1277  outputs->emplace(partition.first, std::move(device_graph));
1278  }
1279 
1280  GraphOptimizationPassOptions optimization_options;
1281  optimization_options.session_options = &options_;
1282  optimization_options.flib_def = client_graph->flib_def.get();
1283  optimization_options.partition_graphs = outputs;
1284  TF_RETURN_IF_ERROR(OptimizationPassRegistry::Global()->RunGrouping(OptimizationPassRegistry::POST_PARTITIONING,
1285  optimization_options));
1286 
1287  Status s;
1288  for (auto& partition : *outputs) {
1289  const string& partition_name = partition.first;
1290  std::unique_ptr<Graph>* graph = &partition.second;
1291 
1292  VLOG(2) << "Created " << DebugString(graph->get()) << " for " << partition_name;
1293 
1294  // Give the device an opportunity to rewrite its subgraph.
1295  Device* d;
1296  s = device_mgr_->LookupDevice(partition_name, &d);
1297  if (!s.ok())
1298  break;
1299  s = d->MaybeRewriteGraph(graph);
1300  if (!s.ok()) {
1301  break;
1302  }
1303  }
1304  *flib_def = std::move(client_graph->flib_def);
1305  std::swap(*input_types, client_graph->feed_types);
1306  std::swap(*output_types, client_graph->fetch_types);
1307  return s;
1308  }
const SessionOptions options_
Definition: NTSession.h:285
TGeoNode Node
DeviceSet device_set_
Definition: NTSession.h:290
Partition
Definition: HLTHPDFilter.cc:32
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
std::atomic< int64 > edge_name_counter_
Definition: NTSession.h:342
d
Definition: ztail.py:151
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::vector< Device * > devices_
Definition: NTSession.h:289
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:335
DDCompactView::Graph Graph
def move(src, dest)
Definition: eostools.py:511
Status tensorflow::NTSession::DecorateAndPublishGraphForDebug ( const DebugOptions &  debug_options,
Graph graph,
Device *  device 
)
private

Definition at line 334 of file NTSession.cc.

References writedatasetfile::args, HltBtagPostValidation_cff::c, cancellation_manager_, CheckNotClosed(), cost_model_manager_, CreateDebuggerState(), device_mgr_, devices_, executor_lock_, tensorflow::NTSession::RunState::executors_done, tensorflow::NTSession::PerPartitionExecutorsAndLib::flib, GetOrCreateExecutors(), tensorflow::NTSession::PerPartitionExecutorsAndLib::graph, graph_def_lock_, mps_fire::i, tensorflow::NTSession::ExecutorsAndKeys::input_name_to_index, tensorflow::NTSession::ExecutorsAndKeys::input_types, PixelMapPlotter::inputs, B2GTnPMonitor_cfi::item, tensorflow::NTSession::ExecutorsAndKeys::items, dqmiolumiharvest::j, cmsLHEtoEOSManager::l, eostools::move(), operation_timeout_in_ms_, options_, tensorflow::NTSession::ExecutorsAndKeys::output_types, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, tensorflow::NTSession::RunState::rendez, ResourceHandleToInputTensor(), runTheMatrix::ret, Run(), alignCSCRings::s, SchedClosure(), session_state_, btagGenBb_cfi::Status, tensorflow::NTSession::ExecutorsAndKeys::step_count, step_id_counter_, sync_on_finish_, and WaitForNotification().

Referenced by CheckNotClosed(), and GetOrCreateExecutors().

334  {
335  std::unique_ptr<DebugGraphDecoratorInterface> decorator;
336  TF_RETURN_IF_ERROR(DebugGraphDecoratorRegistry::CreateDecorator(debug_options, &decorator));
337 
338  TF_RETURN_IF_ERROR(decorator->DecorateGraph(graph, device));
339  TF_RETURN_IF_ERROR(decorator->PublishGraph(*graph, device->name()));
340  return Status::OK();
341  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
void tensorflow::NTSession::ExportCostModels ( CostModelManager::CostModelMap *  cost_models)
inline

Definition at line 122 of file NTSession.h.

References cost_model_manager_.

122  {
123  cost_model_manager_.ExportCostModels(cost_models);
124  }
CostModelManager cost_model_manager_
Definition: NTSession.h:352
Status tensorflow::NTSession::Extend ( const GraphDef &  graph)
override

Definition at line 293 of file NTSession.cc.

References CheckNotClosed(), ExtendLocked(), graph_def_lock_, and cmsLHEtoEOSManager::l.

293  {
294  TF_RETURN_IF_ERROR(CheckNotClosed());
295  mutex_lock l(graph_def_lock_);
296  return ExtendLocked(graph);
297  }
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:266
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:299
Status tensorflow::NTSession::ExtendLocked ( const GraphDef &  graph)
private

Definition at line 299 of file NTSession.cc.

References flib_def_, and MaybeInitializeExecutionState().

Referenced by Create(), and Extend().

299  {
300  bool already_initialized;
301  // If this is the first call, we can initialize the execution state
302  // with `graph` and do not need to call `Extend()`.
303  TF_RETURN_IF_ERROR(MaybeInitializeExecutionState(graph, &already_initialized));
304  if (already_initialized) {
305  TF_RETURN_IF_ERROR(flib_def_->AddLibrary(graph.library()));
306  std::unique_ptr<GraphExecutionState> state;
307  TF_RETURN_IF_ERROR(execution_state_->Extend(graph, &state));
308  execution_state_.swap(state);
309  }
310  return Status::OK();
311  }
Status MaybeInitializeExecutionState(const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:253
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:335
Status tensorflow::NTSession::GetOrCreateExecutors ( gtl::ArraySlice< string >  inputs,
gtl::ArraySlice< string >  outputs,
gtl::ArraySlice< string >  target_nodes,
ExecutorsAndKeys **  executors_and_keys,
RunStateArgs run_state_args 
)
private

Definition at line 918 of file NTSession.cc.

References CreateGraphs(), tensorflow::NTSession::RunStateArgs::debug_options, DecorateAndPublishGraphForDebug(), device_mgr_, device_set_, executor_lock_, dqmdumpme::first, tensorflow::NTSession::RunStateArgs::graph, graph_def_lock_, cuy::graphs, tensorflow::NTSession::RunStateArgs::handle, handle_name_counter_, mps_fire::i, triggerObjects_cff::id, input, tensorflow::NTSession::RunStateArgs::is_partial_run, B2GTnPMonitor_cfi::item, crabWrapper::key, cmsLHEtoEOSManager::l, mps_check::lib, eostools::move(), dqmiodumpmetadata::n, names, node_outputs_callback_, AlcaSiPixelAliHarvester0T_cff::options, options_, convertSQLitetoXML_cfg::output, CalibrationSummaryClient_cfi::params, and session_handle_.

Referenced by DecorateAndPublishGraphForDebug(), and PRunSetup().

922  {
923  int64 handle_name_counter_value = -1;
924  if (LogMemory::IsEnabled() || run_state_args->is_partial_run) {
925  handle_name_counter_value = handle_name_counter_.fetch_add(1);
926  }
927 
928  string debug_tensor_watches_summary;
929  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
930  debug_tensor_watches_summary =
931  SummarizeDebugTensorWatches(run_state_args->debug_options.debug_tensor_watch_opts());
932  }
933 
934  // Fast lookup path, no sorting.
935  const string key = strings::StrCat(str_util::Join(inputs, ","),
936  "->",
937  str_util::Join(outputs, ","),
938  "/",
939  str_util::Join(target_nodes, ","),
940  "/",
941  run_state_args->is_partial_run,
942  "/",
943  debug_tensor_watches_summary);
944  // Set the handle, if it's needed to log memory or for partial run.
945  if (handle_name_counter_value >= 0) {
946  run_state_args->handle = strings::StrCat(key, ";", handle_name_counter_value);
947  }
948 
949  // See if we already have the executors for this run.
950  {
951  mutex_lock l(executor_lock_); // could use reader lock
952  auto it = executors_.find(key);
953  if (it != executors_.end()) {
954  *executors_and_keys = it->second.get();
955  return Status::OK();
956  }
957  }
958 
959  // Slow lookup path, the unsorted key missed the cache.
960  // Sort the inputs and outputs, and look up with the sorted key in case an
961  // earlier call used a different order of inputs and outputs.
962  //
963  // We could consider some other signature instead of sorting that
964  // preserves the same property to avoid the sort in the future.
965  std::vector<string> inputs_sorted(inputs.begin(), inputs.end());
966  std::sort(inputs_sorted.begin(), inputs_sorted.end());
967  std::vector<string> outputs_sorted(outputs.begin(), outputs.end());
968  std::sort(outputs_sorted.begin(), outputs_sorted.end());
969  std::vector<string> tn_sorted(target_nodes.begin(), target_nodes.end());
970  std::sort(tn_sorted.begin(), tn_sorted.end());
971 
972  const string sorted_key = strings::StrCat(str_util::Join(inputs_sorted, ","),
973  "->",
974  str_util::Join(outputs_sorted, ","),
975  "/",
976  str_util::Join(tn_sorted, ","),
977  "/",
978  run_state_args->is_partial_run,
979  "/",
980  debug_tensor_watches_summary);
981  // Set the handle, if its needed to log memory or for partial run.
982  if (handle_name_counter_value >= 0) {
983  run_state_args->handle = strings::StrCat(sorted_key, ";", handle_name_counter_value);
984  }
985 
986  // See if we already have the executors for this run.
987  {
988  mutex_lock l(executor_lock_);
989  auto it = executors_.find(sorted_key);
990  if (it != executors_.end()) {
991  *executors_and_keys = it->second.get();
992  // Insert this under the original key.
993  executors_.emplace(key, it->second);
994  return Status::OK();
995  }
996  }
997 
998  // Nothing found, so create the executors and store in the cache.
999  BuildGraphOptions options;
1000  options.feed_endpoints = inputs_sorted;
1001  options.fetch_endpoints = outputs_sorted;
1002  options.target_nodes = tn_sorted;
1003  options.use_function_convention = !run_state_args->is_partial_run;
1004  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
1005  options.debug_options = run_state_args->debug_options;
1006  }
1007 
1008  std::unique_ptr<FunctionInfo> func_info(new FunctionInfo);
1009  std::shared_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys);
1010 
1011  // The executor_lock_ is intentionally released while executor is
1012  // being created.
1013  std::unordered_map<string, std::unique_ptr<Graph>> graphs;
1014  TF_RETURN_IF_ERROR(
1015  CreateGraphs(options, &graphs, &func_info->flib_def, run_state_args, &ek->input_types, &ek->output_types));
1016 
1017  if (run_state_args->is_partial_run) {
1018  ek->graph = std::move(run_state_args->graph);
1019  std::unordered_set<StringPiece, StringPieceHasher> names;
1020  for (const string& input : inputs) {
1021  TensorId id(ParseTensorName(input));
1022  names.emplace(id.first);
1023  }
1024  for (const string& output : outputs) {
1025  TensorId id(ParseTensorName(output));
1026  names.emplace(id.first);
1027  }
1028  for (Node* n : ek->graph->nodes()) {
1029  if (names.count(n->name()) > 0) {
1030  ek->name_to_node.insert({n->name(), n});
1031  }
1032  }
1033  }
1034  ek->items.reserve(graphs.size());
1035  const auto& optimizer_opts = options_.config.graph_options().optimizer_options();
1036 
1037  int graph_def_version;
1038  {
1039  mutex_lock l(graph_def_lock_);
1040  graph_def_version = execution_state_->original_graph_def().versions().producer();
1041  }
1042  func_info->proc_flr.reset(new ProcessFunctionLibraryRuntime(
1043  device_mgr_.get(), options_.env, graph_def_version, func_info->flib_def.get(), optimizer_opts));
1044 
1045  GraphOptimizer optimizer(optimizer_opts);
1046  for (auto iter = graphs.begin(); iter != graphs.end(); ++iter) {
1047  const string& partition_name = iter->first;
1048  std::unique_ptr<Graph>& partition_graph = iter->second;
1049 
1050  Device* device;
1051  TF_RETURN_IF_ERROR(device_mgr_->LookupDevice(partition_name, &device));
1052 
1053  ek->items.resize(ek->items.size() + 1);
1054  auto* item = &(ek->items.back());
1055  auto lib = func_info->proc_flr->GetFLR(partition_name);
1056  if (lib == nullptr) {
1057  return errors::Internal("Could not find device: ", partition_name);
1058  }
1059  item->flib = lib;
1060 
1061  LocalExecutorParams params;
1062  params.device = device;
1063  params.function_library = lib;
1064  auto opseg = device->op_segment();
1065  params.create_kernel = [this, lib, opseg](const NodeDef& ndef, OpKernel** kernel) {
1066  // We do not share the kernel via the OpSegment if the node is
1067  // stateless, or a function.
1068  // NOTE(mrry): We must not share function kernels (implemented
1069  // using `CallOp`) between subgraphs, because `CallOp::handle_`
1070  // is tied to a particular subgraph. Even if the function itself
1071  // is stateful, the `CallOp` that invokes it is not.
1072  if (!lib->IsStateful(ndef.op()) || lib->GetFunctionLibraryDefinition()->Find(ndef.op()) != nullptr) {
1073  return lib->CreateKernel(ndef, kernel);
1074  }
1075  auto create_fn = [lib, &ndef](OpKernel** kernel) { return lib->CreateKernel(ndef, kernel); };
1076  // Kernels created for subgraph nodes need to be cached. On
1077  // cache miss, create_fn() is invoked to create a kernel based
1078  // on the function library here + global op registry.
1079  return opseg->FindOrCreate(session_handle_, ndef.name(), kernel, create_fn);
1080  };
1081  params.delete_kernel = [lib](OpKernel* kernel) {
1082  // If the node is stateful, opseg owns it. Otherwise, delete it.
1083  if (kernel && !lib->IsStateful(kernel->type_string())) {
1084  delete kernel;
1085  }
1086  };
1087  params.node_outputs_cb = node_outputs_callback_;
1088 
1089  optimizer.Optimize(lib,
1090  options_.env,
1091  device,
1092  &iter->second,
1093  /*shape_map=*/nullptr);
1094 
1095  // EXPERIMENTAL: tfdbg inserts debug nodes in the graph.
1096  if (!options.debug_options.debug_tensor_watch_opts().empty()) {
1097  TF_RETURN_IF_ERROR(
1098  DecorateAndPublishGraphForDebug(options.debug_options, partition_graph.get(), params.device));
1099  }
1100 
1101  TF_RETURN_IF_ERROR(EnsureMemoryTypes(DeviceType(device->device_type()), device->name(), partition_graph.get()));
1102  // NewLocalExecutor takes ownership of partition_graph.
1103  item->graph = partition_graph.get();
1104  item->executor = nullptr;
1105  item->device = device;
1106  Executor* executor;
1107  TF_RETURN_IF_ERROR(NewLocalExecutor(params, partition_graph.release(), &executor));
1108  item->executor.reset(executor);
1109  }
1110 
1111  // Cache the mapping from input/output names to graph elements to
1112  // avoid recomputing it every time.
1113  if (!run_state_args->is_partial_run) {
1114  // For regular `Run()`, we use the function calling convention, and so
1115  // maintain a mapping from input/output names to
1116  // argument/return-value ordinal index.
1117  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
1118  const string& input = inputs_sorted[i];
1119  ek->input_name_to_index[input] = i;
1120  }
1121  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
1122  const string& output = outputs_sorted[i];
1123  ek->output_name_to_index[output] = i;
1124  }
1125  } else {
1126  // For `PRun()`, we use the rendezvous calling convention, and so
1127  // maintain a mapping from input/output names to rendezvous keys.
1128  //
1129  // We always use the first device as the device name portion of the
1130  // key, even if we're feeding another graph.
1131  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
1132  const string& input = inputs_sorted[i];
1133  ek->input_name_to_rendezvous_key[input] =
1134  GetRendezvousKey(input, device_set_.client_device()->attributes(), FrameAndIter(0, 0));
1135  }
1136  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
1137  const string& output = outputs_sorted[i];
1138  ek->output_name_to_rendezvous_key[output] =
1139  GetRendezvousKey(output, device_set_.client_device()->attributes(), FrameAndIter(0, 0));
1140  }
1141  }
1142 
1143  // Reacquire the lock, try to insert into the map.
1144  mutex_lock l(executor_lock_);
1145  functions_.push_back(std::move(func_info));
1146 
1147  // Another thread may have created the entry before us, in which case we will
1148  // reuse the already created one.
1149  auto insert_result = executors_.emplace(sorted_key, ek);
1150  // Insert the value under the original key, so the fast path lookup will work
1151  // if the user uses the same order of inputs, outputs, and targets again.
1152  executors_.emplace(key, insert_result.first->second);
1153  *executors_and_keys = insert_result.first->second.get();
1154 
1155  return Status::OK();
1156  }
const SessionOptions options_
Definition: NTSession.h:285
TGeoNode Node
::tensorflow::Status DecorateAndPublishGraphForDebug(const DebugOptions &debug_options, Graph *graph, Device *device)
Definition: NTSession.cc:334
const std::string names[nVars_]
static std::string const input
Definition: EdmProvDump.cc:48
DeviceSet device_set_
Definition: NTSession.h:290
std::atomic< int64 > handle_name_counter_
Definition: NTSession.h:343
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
::tensorflow::Status CreateGraphs(const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
Definition: NTSession.cc:1158
Executor::Args::NodeOutputsCallback node_outputs_callback_
Definition: NTSession.h:354
graphs
Definition: cuy.py:962
def move(src, dest)
Definition: eostools.py:511
bool graph_created_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
GraphDef graph_def_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
std::vector<std::unique_ptr<FunctionInfo> > functions_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, std::shared_ptr<ExecutorsAndKeys> > executors_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, std::unique_ptr<RunState> > partial_runs_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, string> stateful_placements_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
std::unique_ptr<GraphExecutionState> execution_state_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
bool closed_ tensorflow::NTSession::GUARDED_BY ( closed_lock_  )
private
tensorflow::Status tensorflow::NTSession::ListDevices ( std::vector< DeviceAttributes > *  response)
override

Definition at line 1310 of file NTSession.cc.

References ztail::d, and devices_.

1310  {
1311  response->clear();
1312  response->reserve(devices_.size());
1313  for (Device* d : devices_) {
1314  const DeviceAttributes& attrs = d->attributes();
1315  response->emplace_back(attrs);
1316  }
1318  }
d
Definition: ztail.py:151
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
std::vector< Device * > devices_
Definition: NTSession.h:289
::tensorflow::Status tensorflow::NTSession::LocalDeviceManager ( const DeviceMgr **  output)
inlineoverride

Definition at line 117 of file NTSession.h.

References device_mgr_.

117  {
118  *output = device_mgr_.get();
120  }
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
Status tensorflow::NTSession::MaybeInitializeExecutionState ( const GraphDef &  graph,
bool *  out_already_initialized 
)
private

Definition at line 253 of file NTSession.cc.

References device_set_, flib_def_, AlcaSiPixelAliHarvester0T_cff::options, options_, and groupFilesInBlocks::temp.

Referenced by ExtendLocked().

253  {
254  // If already initialized, do nothing.
255  if (flib_def_ && execution_state_) {
256  *out_already_initialized = true;
257  return Status::OK();
258  }
259  // Set up the per-session execution state.
260  // NOTE(mrry): The function library created here will be used for
261  // all subsequent extensions of the graph.
262  flib_def_.reset(new FunctionLibraryDefinition(OpRegistry::Global(), graph.library()));
263  GraphExecutionStateOptions options;
264  options.device_set = &device_set_;
265  options.session_options = &options_;
266  // TODO(mrry,suharshs): We explicitly copy `graph` so that
267  // `MakeForBaseGraph()` can take ownership of its
268  // contents. Previously this happened implicitly in calls to the
269  // `GraphExecutionState`. Other sessions call
270  // `MakeForBaseGraph` in such a way that we can destructively read
271  // the passed-in `GraphDef`. In principle we could do the same here,
272  // with a wider refactoring; we might revise the direct session so
273  // that it copies the graph fewer times.
274  GraphDef temp(graph);
275  TF_RETURN_IF_ERROR(GraphExecutionState::MakeForBaseGraph(&temp, options, &execution_state_));
276  graph_created_ = true;
277  *out_already_initialized = false;
278  return Status::OK();
279  }
const SessionOptions options_
Definition: NTSession.h:285
DeviceSet device_set_
Definition: NTSession.h:290
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:335
Status tensorflow::NTSession::PRun ( const string &  handle,
const NamedTensorList inputs,
const std::vector< string > &  output_names,
std::vector< Tensor > *  outputs 
)
override

Definition at line 659 of file NTSession.cc.

References cancellation_manager_, CheckFetch(), CheckNotClosed(), fileCollector::done, executor_lock_, input, crabWrapper::key, cmsLHEtoEOSManager::l, LOG, tensorflow::NTSession::RunState::mu_, Skims_PA_cff::name, operation_timeout_in_ms_, convertSQLitetoXML_cfg::output, contentValuesFiles::parts, tensorflow::NTSession::RunState::pending_inputs, tensorflow::NTSession::RunState::pending_outputs, tensorflow::NTSession::RunState::PendingDone(), RecvPRunOutputs(), tensorflow::NTSession::RunState::rendez, alignCSCRings::s, SendPRunInputs(), session_state_, btagGenBb_cfi::Status, tensorflow::NTSession::RunState::tensor_store, WaitForNotification(), and hlt_jetmet_dqm_QT_fromfile_cfg::WARNING.

662  {
663  TF_RETURN_IF_ERROR(CheckNotClosed());
664  std::vector<string> parts = str_util::Split(handle, ';');
665  const string& key = parts[0];
666  // Get the executors for this partial run.
667  ExecutorsAndKeys* executors_and_keys;
668  RunState* run_state;
669  {
670  mutex_lock l(executor_lock_); // could use reader lock
671  auto exc_it = executors_.find(key);
672  if (exc_it == executors_.end()) {
673  return errors::InvalidArgument("Must run 'setup' before performing partial runs!");
674  }
675  executors_and_keys = exc_it->second.get();
676 
677  auto prun_it = partial_runs_.find(handle);
678  if (prun_it == partial_runs_.end()) {
679  return errors::InvalidArgument("Must run 'setup' before performing partial runs!");
680  }
681  run_state = prun_it->second.get();
682 
683  // Make sure that this is a new set of feeds that are still pending.
684  for (const auto& input : inputs) {
685  auto it = run_state->pending_inputs.find(input.first);
686  if (it == run_state->pending_inputs.end()) {
687  return errors::InvalidArgument("The feed ", input.first, " was not specified in partial_run_setup.");
688  } else if (it->second) {
689  return errors::InvalidArgument("The feed ", input.first, " has already been fed.");
690  }
691  }
692  // Check that this is a new set of fetches that are still pending.
693  for (const auto& output : output_names) {
694  auto it = run_state->pending_outputs.find(output);
695  if (it == run_state->pending_outputs.end()) {
696  return errors::InvalidArgument("The fetch ", output, " was not specified in partial_run_setup.");
697  } else if (it->second) {
698  return errors::InvalidArgument("The fetch ", output, " has already been fetched.");
699  }
700  }
701  }
702 
703  // Check that this new set of fetches can be computed from all the
704  // feeds we have supplied.
705  TF_RETURN_IF_ERROR(CheckFetch(inputs, output_names, executors_and_keys, run_state));
706 
707  // Send inputs.
708  Status s = SendPRunInputs(inputs, executors_and_keys, run_state->rendez);
709 
710  // Receive outputs.
711  if (s.ok()) {
712  s = RecvPRunOutputs(output_names, executors_and_keys, run_state, outputs);
713  }
714 
715  // Save the output tensors of this run we choose to keep.
716  if (s.ok()) {
717  s = run_state->tensor_store.SaveTensors(output_names, &session_state_);
718  }
719 
720  {
721  mutex_lock l(executor_lock_);
722  // Delete the run state if there is an error or all fetches are done.
723  bool done = true;
724  if (s.ok()) {
725  {
726  mutex_lock l(run_state->mu_);
727  if (!run_state->status.ok()) {
728  LOG(WARNING) << "An error unrelated to this prun has been detected. " << run_state->status;
729  }
730  }
731  for (const auto& input : inputs) {
732  auto it = run_state->pending_inputs.find(input.first);
733  it->second = true;
734  }
735  for (const auto& name : output_names) {
736  auto it = run_state->pending_outputs.find(name);
737  it->second = true;
738  }
739  done = run_state->PendingDone();
740  }
741  if (done) {
743  partial_runs_.erase(handle);
744  }
745  }
746 
747  return s;
748  }
SessionState session_state_
Definition: NTSession.h:318
#define LOG(A)
::tensorflow::Status SendPRunInputs(const std::vector< std::pair< string, Tensor >> &inputs, const ExecutorsAndKeys *executors_and_keys, IntraProcessRendezvous *rendez)
Definition: NTSession.cc:775
static std::string const input
Definition: EdmProvDump.cc:48
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:266
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: NTSession.cc:1398
::tensorflow::Status CheckFetch(const std::vector< std::pair< string, Tensor >> &feeds, const std::vector< string > &fetches, const ExecutorsAndKeys *executors_and_keys, const RunState *run_state)
Definition: NTSession.cc:854
::tensorflow::Status RecvPRunOutputs(const std::vector< string > &output_names, const ExecutorsAndKeys *executors_and_keys, RunState *run_state, std::vector< Tensor > *outputs)
Definition: NTSession.cc:813
CancellationManager * cancellation_manager_
Definition: NTSession.h:321
const int64 operation_timeout_in_ms_
Definition: NTSession.h:349
Status tensorflow::NTSession::PRunSetup ( const std::vector< string > &  input_names,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
string *  handle 
)
override

Definition at line 592 of file NTSession.cc.

References writedatasetfile::args, HltBtagPostValidation_cff::c, cancellation_manager_, CheckNotClosed(), device_mgr_, devices_, executor_lock_, tensorflow::NTSession::RunState::executors_done, GetOrCreateExecutors(), graph_def_lock_, patZpeak::handle, tensorflow::NTSession::RunStateArgs::handle, tensorflow::NTSession::RunStateArgs::is_partial_run, B2GTnPMonitor_cfi::item, tensorflow::NTSession::ExecutorsAndKeys::items, cmsLHEtoEOSManager::l, eostools::move(), options_, tensorflow::NTSession::RunState::rendez, runTheMatrix::ret, personalPlayback::RunState, SchedClosure(), session_state_, btagGenBb_cfi::Status, step_id_counter_, and sync_on_finish_.

595  {
596  TF_RETURN_IF_ERROR(CheckNotClosed());
597  {
598  mutex_lock l(graph_def_lock_);
599  if (!graph_created_) {
600  return errors::InvalidArgument("Session was not created with a graph before PRunSetup()!");
601  }
602  }
603 
604  // Check if we already have an executor for these arguments.
605  ExecutorsAndKeys* executors_and_keys;
606  // TODO(cais): TFDBG support for partial runs.
607  DebugOptions debug_options;
608  RunStateArgs run_state_args(debug_options);
609  run_state_args.is_partial_run = true;
610  TF_RETURN_IF_ERROR(
611  GetOrCreateExecutors(input_names, output_names, target_nodes, &executors_and_keys, &run_state_args));
612 
613  // Create the run state and save it for future PRun calls.
614  Executor::Args args;
615  args.step_id = step_id_counter_.fetch_add(1);
616  RunState* run_state = new RunState(input_names, output_names, args.step_id, &devices_);
617  run_state->rendez = new IntraProcessRendezvous(device_mgr_.get());
618  {
619  mutex_lock l(executor_lock_);
620  if (!partial_runs_.emplace(run_state_args.handle, std::unique_ptr<RunState>(run_state)).second) {
621  return errors::Internal("The handle '", run_state_args.handle, "' created for this partial run is not unique.");
622  }
623  }
624 
625  // Start parallel Executors.
626  const size_t num_executors = executors_and_keys->items.size();
627  ExecutorBarrier* barrier = new ExecutorBarrier(num_executors, run_state->rendez, [run_state](const Status& ret) {
628  if (!ret.ok()) {
629  mutex_lock l(run_state->mu_);
630  run_state->status.Update(ret);
631  }
632  run_state->executors_done.Notify();
633  });
634 
635  args.rendezvous = run_state->rendez;
636  args.cancellation_manager = cancellation_manager_;
637  args.runner = [this](Executor::Args::Closure c) { SchedClosure(std::move(c)); };
638  args.session_state = &session_state_;
639  args.tensor_store = &run_state->tensor_store;
640  args.step_container = &run_state->step_container;
641  if (LogMemory::IsEnabled()) {
642  LogMemory::RecordStep(args.step_id, run_state_args.handle);
643  }
644  args.sync_on_finish = sync_on_finish_;
645 
646  if (options_.config.graph_options().build_cost_model()) {
647  run_state->collector.reset(new StepStatsCollector(nullptr));
648  args.stats_collector = run_state->collector.get();
649  }
650 
651  for (auto& item : executors_and_keys->items) {
652  item.executor->RunAsync(args, barrier->Get());
653  }
654 
655  *handle = run_state_args.handle;
656  return Status::OK();
657  }
static std::atomic_int_fast64_t step_id_counter_
Definition: NTSession.h:346
const SessionOptions options_
Definition: NTSession.h:285
ret
prodAgent to be discontinued
SessionState session_state_
Definition: NTSession.h:318
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:266
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
std::vector< Device * > devices_
Definition: NTSession.h:289
void SchedClosure(std::function< void()> c)
Definition: NTSession.cc:189
::tensorflow::Status GetOrCreateExecutors(gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
Definition: NTSession.cc:918
CancellationManager * cancellation_manager_
Definition: NTSession.h:321
def move(src, dest)
Definition: eostools.py:511
Status tensorflow::NTSession::RecvPRunOutputs ( const std::vector< string > &  output_names,
const ExecutorsAndKeys executors_and_keys,
RunState run_state,
std::vector< Tensor > *  outputs 
)
private

Definition at line 813 of file NTSession.cc.

References operation_timeout_in_ms_, tensorflow::NTSession::ExecutorsAndKeys::output_name_to_rendezvous_key, tensorflow::NTSession::RunState::rendez, alignCSCRings::s, and btagGenBb_cfi::Status.

Referenced by PRun().

816  {
817  Status s;
818  if (!output_names.empty()) {
819  outputs->resize(output_names.size());
820  }
821 
822  Rendezvous::ParsedKey parsed;
823  // Get the outputs from the rendezvous
824  for (size_t output_offset = 0; output_offset < output_names.size(); ++output_offset) {
825  const string& output_name = output_names[output_offset];
826  auto it = executors_and_keys->output_name_to_rendezvous_key.find(output_name);
827  if (it == executors_and_keys->output_name_to_rendezvous_key.end()) {
828  return errors::Internal("'", output_name, "' is not a pre-defined fetch.");
829  }
830  const string& output_key = it->second;
831  Tensor output_tensor;
832  bool is_dead;
833  IntraProcessRendezvous* rendez = run_state->rendez;
834 
835  s = Rendezvous::ParseKey(output_key, &parsed);
836  if (s.ok()) {
837  // Fetch data from the Rendezvous.
838  s = rendez->Recv(parsed, Rendezvous::Args(), &output_tensor, &is_dead, operation_timeout_in_ms_);
839  if (is_dead && s.ok()) {
840  s = errors::InvalidArgument("The tensor returned for ", output_name, " was not valid.");
841  }
842  }
843  if (!s.ok()) {
844  rendez->StartAbort(s);
845  outputs->clear();
846  return s;
847  }
848 
849  (*outputs)[output_offset] = output_tensor;
850  }
851  return Status::OK();
852  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
const int64 operation_timeout_in_ms_
Definition: NTSession.h:349
tensorflow::Status tensorflow::NTSession::Reset ( const std::vector< string > &  containers)

Definition at line 1320 of file NTSession.cc.

References device_mgr_.

1320  {
1321  device_mgr_->ClearContainers(containers);
1323  }
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:288
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
Status tensorflow::NTSession::ResourceHandleToInputTensor ( const Tensor &  resource_tensor,
Tensor *  retrieved_tensor 
)
private

Definition at line 750 of file NTSession.cc.

References session_state_.

Referenced by DecorateAndPublishGraphForDebug(), and SendPRunInputs().

750  {
751  if (resource_tensor.dtype() != DT_RESOURCE) {
752  return errors::InvalidArgument(
753  strings::StrCat("ResourceHandleToInputTensor() received non-DT_RESOURCE Tensor: ", resource_tensor.dtype()));
754  }
755 
756  const ResourceHandle& resource_handle = resource_tensor.scalar<ResourceHandle>()();
757 
758  if (resource_handle.container() == SessionState::kTensorHandleResourceTypeName) {
759  return session_state_.GetTensor(resource_handle.name(), retrieved_tensor);
760  } else {
761  return errors::InvalidArgument(
762  strings::StrCat("Invalid resource type hash code: ",
763  resource_handle.hash_code(),
764  "(name: ",
765  resource_handle.name(),
766  " type: ",
767  resource_handle.maybe_type_name(),
768  "). Perhaps a resource tensor was being provided as a feed? That is "
769  "not currently allowed. Please file an issue at "
770  "https://github.com/tensorflow/tensorflow/issues/new, ideally with a "
771  "short code snippet that leads to this error message."));
772  }
773  }
SessionState session_state_
Definition: NTSession.h:318
Status tensorflow::NTSession::Run ( const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs 
)
override

Definition at line 313 of file NTSession.cc.

Referenced by DecorateAndPublishGraphForDebug().

316  {
317  RunMetadata run_metadata;
318  return Run(RunOptions(), inputs, output_names, target_nodes, outputs, &run_metadata);
319  }
::tensorflow::Status Run(const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
Definition: NTSession.cc:313
::tensorflow::Status tensorflow::NTSession::Run ( const ::tensorflow::RunOptions &  run_options,
const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs,
RunMetadata *  run_metadata 
)
override
void tensorflow::NTSession::SchedClosure ( std::function< void()>  c)
private
Status tensorflow::NTSession::SendPRunInputs ( const std::vector< std::pair< string, Tensor >> &  inputs,
const ExecutorsAndKeys executors_and_keys,
IntraProcessRendezvous *  rendez 
)
private

Definition at line 775 of file NTSession.cc.

References input, tensorflow::NTSession::ExecutorsAndKeys::input_name_to_rendezvous_key, ResourceHandleToInputTensor(), alignCSCRings::s, and btagGenBb_cfi::Status.

Referenced by PRun().

777  {
778  Status s;
779  Rendezvous::ParsedKey parsed;
780  // Insert the input tensors into the local rendezvous by their
781  // rendezvous key.
782  for (const auto& input : inputs) {
783  auto it = executors_and_keys->input_name_to_rendezvous_key.find(input.first);
784  if (it == executors_and_keys->input_name_to_rendezvous_key.end()) {
785  return errors::Internal("'", input.first, "' is not a pre-defined feed.");
786  }
787  const string& input_key = it->second;
788 
789  s = Rendezvous::ParseKey(input_key, &parsed);
790  if (!s.ok()) {
791  rendez->StartAbort(s);
792  return s;
793  }
794 
795  if (input.second.dtype() == DT_RESOURCE) {
796  Tensor tensor_from_handle;
797  s = ResourceHandleToInputTensor(input.second, &tensor_from_handle);
798  if (s.ok()) {
799  s = rendez->Send(parsed, Rendezvous::Args(), tensor_from_handle, false);
800  }
801  } else {
802  s = rendez->Send(parsed, Rendezvous::Args(), input.second, false);
803  }
804 
805  if (!s.ok()) {
806  rendez->StartAbort(s);
807  return s;
808  }
809  }
810  return Status::OK();
811  }
::tensorflow::Status ResourceHandleToInputTensor(const Tensor &resource_tensor, Tensor *retrieved_tensor)
Definition: NTSession.cc:750
static std::string const input
Definition: EdmProvDump.cc:48
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
tensorflow::NTSession::TF_DISALLOW_COPY_AND_ASSIGN ( NTSession  )
private
tensorflow::Status tensorflow::NTSession::WaitForNotification ( Notification *  n,
int64  timeout_in_ms 
)
private

Definition at line 1398 of file NTSession.cc.

References btagGenBb_cfi::Status.

Referenced by DecorateAndPublishGraphForDebug(), PRun(), and WaitForNotification().

1398  {
1399  if (timeout_in_ms > 0) {
1400  const int64 timeout_in_us = timeout_in_ms * 1000;
1401  const bool notified = WaitForNotificationWithTimeout(notification, timeout_in_us);
1402  if (!notified) {
1403  return Status(error::DEADLINE_EXCEEDED, "Timed out waiting for notification");
1404  }
1405  } else {
1406  notification->WaitForNotification();
1407  }
1408  return Status::OK();
1409  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:126
void tensorflow::NTSession::WaitForNotification ( RunState run_state,
CancellationManager *  cm,
int64  timeout_in_ms 
)
private

Definition at line 1383 of file NTSession.cc.

References tensorflow::NTSession::RunState::executors_done, cmsLHEtoEOSManager::l, tensorflow::NTSession::RunState::mu_, btagGenBb_cfi::Status, mps_update::status, and WaitForNotification().

1383  {
1384  const Status status = WaitForNotification(&run_state->executors_done, timeout_in_ms);
1385  if (!status.ok()) {
1386  {
1387  mutex_lock l(run_state->mu_);
1388  run_state->status.Update(status);
1389  }
1390  cm->StartCancel();
1391  // We must wait for the executors to complete, because they have borrowed
1392  // references to `cm` and other per-step state. After this notification, it
1393  // is safe to clean up the step.
1394  run_state->executors_done.WaitForNotification();
1395  }
1396  }
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: NTSession.cc:1398

Friends And Related Function Documentation

friend class DebugGateway
friend

Definition at line 359 of file NTSession.h.

Member Data Documentation

CancellationManager* tensorflow::NTSession::cancellation_manager_
private

Definition at line 321 of file NTSession.h.

Referenced by Close(), DecorateAndPublishGraphForDebug(), PRun(), PRunSetup(), and ~NTSession().

mutex tensorflow::NTSession::closed_lock_
private

Definition at line 338 of file NTSession.h.

Referenced by CheckNotClosed(), and Close().

CostModelManager tensorflow::NTSession::cost_model_manager_
private

Definition at line 352 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and ExportCostModels().

const std::unique_ptr<const DeviceMgr> tensorflow::NTSession::device_mgr_
private
DeviceSet tensorflow::NTSession::device_set_
private
std::vector<Device*> tensorflow::NTSession::devices_
private
std::atomic<int64> tensorflow::NTSession::edge_name_counter_ = {0}
private

Definition at line 342 of file NTSession.h.

Referenced by CreateGraphs().

mutex tensorflow::NTSession::executor_lock_
private
NTSessionFactory* const tensorflow::NTSession::factory_
private

Definition at line 320 of file NTSession.h.

Referenced by Close().

std::unique_ptr<FunctionLibraryDefinition> tensorflow::NTSession::flib_def_
private

Definition at line 335 of file NTSession.h.

Referenced by CreateGraphs(), ExtendLocked(), MaybeInitializeExecutionState(), and ~NTSession().

mutex tensorflow::NTSession::graph_def_lock_
private
std::atomic<int64> tensorflow::NTSession::handle_name_counter_ = {0}
private

Definition at line 343 of file NTSession.h.

Referenced by GetOrCreateExecutors().

Status tensorflow::NTSession::init_error_
private

Definition at line 298 of file NTSession.h.

Referenced by Create().

Executor::Args::NodeOutputsCallback tensorflow::NTSession::node_outputs_callback_ = nullptr
private

Definition at line 354 of file NTSession.h.

Referenced by GetOrCreateExecutors().

const int64 tensorflow::NTSession::operation_timeout_in_ms_ = 0
private

Definition at line 349 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), PRun(), and RecvPRunOutputs().

const SessionOptions tensorflow::NTSession::options_
private
string tensorflow::NTSession::session_handle_
private

Definition at line 292 of file NTSession.h.

Referenced by GetOrCreateExecutors(), NTSession(), and ~NTSession().

SessionState tensorflow::NTSession::session_state_
private
std::atomic_int_fast64_t tensorflow::NTSession::step_id_counter_
staticprivate

Definition at line 346 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and PRunSetup().

bool tensorflow::NTSession::sync_on_finish_ = true
private

Definition at line 301 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), NTSession(), and PRunSetup().