CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | Friends
tensorflow::NTSession Class Reference

#include <NTSession.h>

Inheritance diagram for tensorflow::NTSession:
Session

Classes

struct  ExecutorsAndKeys
 
struct  FunctionInfo
 
struct  PerPartitionExecutorsAndLib
 
struct  RunState
 
struct  RunStateArgs
 

Public Types

typedef std::function< void(Session *)> CloseCallback
 
typedef std::vector< std::pair< string, Tensor > > NamedTensorList
 
typedef std::unordered_map< StringPiece, Node *, StringPieceHasher > NameNodeMap
 

Public Member Functions

::tensorflow::Status Close () override
 
::tensorflow::Status Create (const GraphDef &graph) override
 
void ExportCostModels (CostModelManager::CostModelMap *cost_models)
 
::tensorflow::Status Extend (const GraphDef &graph) override
 
::tensorflow::Status ListDevices (std::vector< DeviceAttributes > *response) override
 
::tensorflow::Status LocalDeviceManager (const DeviceMgr **output) override
 
 NTSession (const SessionOptions &options, const DeviceMgr *device_mgr, NTSessionFactory *factory)
 
::tensorflow::Status PRun (const string &handle, const NamedTensorList &inputs, const std::vector< string > &output_names, std::vector< Tensor > *outputs) override
 
::tensorflow::Status PRunSetup (const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_nodes, string *handle) override
 
::tensorflow::Status Reset (const std::vector< string > &containers)
 
::tensorflow::Status Run (const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
 
::tensorflow::Status Run (const ::tensorflow::RunOptions &run_options, const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs, RunMetadata *run_metadata) override
 
 ~NTSession () override
 

Private Member Functions

::tensorflow::Status CheckFetch (const std::vector< std::pair< string, Tensor >> &feeds, const std::vector< string > &fetches, const ExecutorsAndKeys *executors_and_keys, const RunState *run_state)
 
::tensorflow::Status CheckNotClosed ()
 
::tensorflow::Status CreateDebuggerState (const DebugOptions &debug_options, int64 session_run_index, int64 executor_step_index, const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_names, std::unique_ptr< DebuggerStateInterface > *debugger_state)
 
::tensorflow::Status CreateGraphs (const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
 
::tensorflow::Status DecorateAndPublishGraphForDebug (const DebugOptions &debug_options, Graph *graph, Device *device)
 
::tensorflow::Status ExtendLocked (const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status GetOrCreateExecutors (gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
 
bool graph_created_ GUARDED_BY (graph_def_lock_)
 
GraphDef graph_def_ GUARDED_BY (graph_def_lock_)
 
std::vector< std::unique_ptr< FunctionInfo > > functions_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, std::shared_ptr< ExecutorsAndKeys > > executors_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, std::unique_ptr< RunState > > partial_runs_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, string > stateful_placements_ GUARDED_BY (graph_def_lock_)
 
std::unique_ptr< GraphExecutionState > execution_state_ GUARDED_BY (graph_def_lock_)
 
bool closed_ GUARDED_BY (closed_lock_)
 
Status MaybeInitializeExecutionState (const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status RecvPRunOutputs (const std::vector< string > &output_names, const ExecutorsAndKeys *executors_and_keys, RunState *run_state, std::vector< Tensor > *outputs)
 
::tensorflow::Status ResourceHandleToInputTensor (const Tensor &resource_tensor, Tensor *retrieved_tensor)
 
void SchedClosure (std::function< void()> c)
 
::tensorflow::Status SendPRunInputs (const std::vector< std::pair< string, Tensor >> &inputs, const ExecutorsAndKeys *executors_and_keys, IntraProcessRendezvous *rendez)
 
 TF_DISALLOW_COPY_AND_ASSIGN (NTSession)
 
::tensorflow::Status WaitForNotification (Notification *n, int64 timeout_in_ms)
 
void WaitForNotification (RunState *run_state, CancellationManager *cm, int64 timeout_in_ms)
 

Private Attributes

CancellationManager * cancellation_manager_
 
mutex closed_lock_
 
CostModelManager cost_model_manager_
 
const std::unique_ptr< const DeviceMgr > device_mgr_
 
DeviceSet device_set_
 
std::vector< Device * > devices_
 
std::atomic< int64 > edge_name_counter_ = {0}
 
mutex executor_lock_
 
NTSessionFactory *const factory_
 
std::unique_ptr< FunctionLibraryDefinition > flib_def_
 
mutex graph_def_lock_
 
std::atomic< int64 > handle_name_counter_ = {0}
 
Status init_error_
 
Executor::Args::NodeOutputsCallback node_outputs_callback_ = nullptr
 
const int64 operation_timeout_in_ms_ = 0
 
const SessionOptions options_
 
string session_handle_
 
SessionState session_state_
 
bool sync_on_finish_ = true
 

Static Private Attributes

static std::atomic_int_fast64_t step_id_counter_
 

Friends

class DebugGateway
 

Detailed Description

Definition at line 71 of file NTSession.h.

Member Typedef Documentation

typedef std::function<void(Session*)> tensorflow::NTSession::CloseCallback

Definition at line 73 of file NTSession.h.

typedef std::vector<std::pair<string, Tensor> > tensorflow::NTSession::NamedTensorList

Definition at line 83 of file NTSession.h.

typedef std::unordered_map<StringPiece, Node*, StringPieceHasher> tensorflow::NTSession::NameNodeMap

Definition at line 84 of file NTSession.h.

Constructor & Destructor Documentation

tensorflow::NTSession::NTSession ( const SessionOptions &  options,
const DeviceMgr *  device_mgr,
NTSessionFactory factory 
)

Definition at line 196 of file NTSession.cc.

References edmIntegrityCheck::d, device_mgr_, device_set_, devices_, dqm::qstatus::ERROR, MessageLogger_cfi::INFO, LOG, session_handle_, btagGenBb_cfi::Status, mps_update::status, and sync_on_finish_.

199  : options_(options),
200  device_mgr_(device_mgr),
201  factory_(factory),
202  cancellation_manager_(new CancellationManager()),
203  operation_timeout_in_ms_(options_.config.operation_timeout_in_ms()) {
204  // The default value of sync_on_finish will be flipped soon and this
205  // environment variable will be removed as well.
206  const Status status =
207  ReadBoolFromEnvVar("TF_SYNC_ON_FINISH", true, &sync_on_finish_);
208  if (!status.ok()) {
209  LOG(ERROR) << status.error_message();
210  }
211  // NOTE(mrry): We do not need to use a unique string for the session
212  // handle, because NTSession owns its devices. This may change
213  // in future versions.
214  session_handle_ = "no_threads";
215  int devices_added = 0;
216  if (options.config.log_device_placement()) {
217  const string mapping_str = device_mgr_->DeviceMappingString();
218  if (mapping_str.empty()) {
219  printf("Device mapping: no known devices.\n");
220  } else {
221  printf("Device mapping:\n%s", mapping_str.c_str());
222  }
223  LOG(INFO) << "Device mapping:\n" << mapping_str;
224  }
225  for (auto d : device_mgr_->ListDevices()) {
226  devices_.push_back(d);
227  device_set_.AddDevice(d);
228  d->op_segment()->AddHold(session_handle_);
229 
230  // The first device added is special: it is the 'client device' (a
231  // CPU device) from which we feed and fetch Tensors.
232  if (devices_added == 0) {
233  device_set_.set_client_device(d);
234  }
235  ++devices_added;
236  }
237 }
const SessionOptions options_
Definition: NTSession.h:287
#define LOG(A)
DeviceSet device_set_
Definition: NTSession.h:292
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::vector< Device * > devices_
Definition: NTSession.h:291
CancellationManager * cancellation_manager_
Definition: NTSession.h:326
const int64 operation_timeout_in_ms_
Definition: NTSession.h:356
NTSessionFactory *const factory_
Definition: NTSession.h:325
static const int ERROR
tensorflow::NTSession::~NTSession ( )
override

Definition at line 239 of file NTSession.cc.

References cancellation_manager_, Close(), edmIntegrityCheck::d, device_mgr_, flib_def_, and session_handle_.

239  {
240  if (!closed_) Close().IgnoreError();
241  for (auto& it : partial_runs_) {
242  it.second.reset(nullptr);
243  }
244  for (auto& it : executors_) {
245  it.second.reset();
246  }
247  for (auto d : device_mgr_->ListDevices()) {
248  d->op_segment()->RemoveHold(session_handle_);
249  }
250  for (auto d : device_mgr_->ListDevices()) {
251  d->ClearResourceMgr();
252  }
253  functions_.clear();
254  delete cancellation_manager_;
255 
256  execution_state_.reset(nullptr);
257  flib_def_.reset(nullptr);
258 }
::tensorflow::Status Close() override
Definition: NTSession.cc:1392
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
CancellationManager * cancellation_manager_
Definition: NTSession.h:326
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:342

Member Function Documentation

Status tensorflow::NTSession::CheckFetch ( const std::vector< std::pair< string, Tensor >> &  feeds,
const std::vector< string > &  fetches,
const ExecutorsAndKeys executors_and_keys,
const RunState run_state 
)
private

Definition at line 918 of file NTSession.cc.

References executor_lock_, cond::persistency::fetch(), plotBeamSpotDB::first, tensorflow::NTSession::ExecutorsAndKeys::graph, triggerObjects_cff::id, input, checklumidiff::l, gen::n, tensorflow::NTSession::ExecutorsAndKeys::name_to_node, edm::errors::NotFound, tensorflow::NTSession::RunState::pending_inputs, svgfig::stack, and class-composition::visited.

Referenced by PRun().

921  {
922  const Graph* graph = executors_and_keys->graph.get();
923  const NameNodeMap* name_to_node = &executors_and_keys->name_to_node;
924 
925  // Build the set of pending feeds that we haven't seen.
926  std::unordered_set<TensorId, TensorId::Hasher> pending_feeds;
927  {
928  mutex_lock l(executor_lock_);
929  for (const auto& input : run_state->pending_inputs) {
930  // Skip if the feed has already been fed.
931  if (input.second) continue;
932  TensorId id(ParseTensorName(input.first));
933  auto it = name_to_node->find(id.first);
934  if (it == name_to_node->end()) {
935  return errors::NotFound("Feed ", input.first, ": not found");
936  }
937  pending_feeds.insert(id);
938  }
939  }
940  for (const auto& it : feeds) {
941  TensorId id(ParseTensorName(it.first));
942  pending_feeds.erase(id);
943  }
944 
945  // Initialize the stack with the fetch nodes.
946  std::vector<const Node*> stack;
947  for (const string& fetch : fetches) {
948  TensorId id(ParseTensorName(fetch));
949  auto it = name_to_node->find(id.first);
950  if (it == name_to_node->end()) {
951  return errors::NotFound("Fetch ", fetch, ": not found");
952  }
953  stack.push_back(it->second);
954  }
955 
956  // Any tensor needed for fetches can't be in pending_feeds.
957  std::vector<bool> visited(graph->num_node_ids(), false);
958  while (!stack.empty()) {
959  const Node* n = stack.back();
960  stack.pop_back();
961 
962  for (const Edge* in_edge : n->in_edges()) {
963  const Node* in_node = in_edge->src();
964  if (pending_feeds.count({in_node->name(), in_edge->src_output()}) > 0) {
965  return errors::InvalidArgument("Fetch ", in_node->name(), ":",
966  in_edge->src_output(),
967  " can't be computed from the feeds"
968  " that have been fed so far.");
969  }
970  if (!visited[in_node->id()]) {
971  visited[in_node->id()] = true;
972  stack.push_back(in_node);
973  }
974  }
975  }
976  return Status::OK();
977 }
static std::string const input
Definition: EdmProvDump.cc:45
stack
Definition: svgfig.py:558
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::pair< std::string, std::shared_ptr< void > > fetch(const cond::Hash &payloadId, Session &session)
Definition: CondDBFetch.cc:325
std::unordered_map< StringPiece, Node *, StringPieceHasher > NameNodeMap
Definition: NTSession.h:84
::tensorflow::Status tensorflow::NTSession::CheckNotClosed ( )
inlineprivate
tensorflow::Status tensorflow::NTSession::Close ( )
override

Definition at line 1392 of file NTSession.cc.

References cancellation_manager_, closed_lock_, tensorflow::NTSessionFactory::Deregister(), factory_, and checklumidiff::l.

Referenced by ~NTSession().

1392  {
1393  cancellation_manager_->StartCancel();
1394  {
1395  mutex_lock l(closed_lock_);
1396  if (closed_) return ::tensorflow::Status::OK();
1397  closed_ = true;
1398  }
1399  if (factory_ != nullptr) factory_->Deregister(this);
1401 }
void Deregister(const NTSession *session)
Definition: NTSession.cc:155
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
CancellationManager * cancellation_manager_
Definition: NTSession.h:326
NTSessionFactory *const factory_
Definition: NTSession.h:325
Status tensorflow::NTSession::Create ( const GraphDef &  graph)
override

Definition at line 291 of file NTSession.cc.

References ExtendLocked(), graph_def_lock_, init_error_, and checklumidiff::l.

291  {
292  TF_RETURN_IF_ERROR(init_error_);
293  if (graph.node_size() > 0) {
294  mutex_lock l(graph_def_lock_);
295  if (graph_created_) {
296  return errors::AlreadyExists(
297  "A Graph has already been created for this session.");
298  }
299  return ExtendLocked(graph);
300  }
301  return Status::OK();
302 }
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:310
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::NTSession::CreateDebuggerState ( const DebugOptions &  debug_options,
int64  session_run_index,
int64  executor_step_index,
const std::vector< string > &  input_names,
const std::vector< string > &  output_names,
const std::vector< string > &  target_names,
std::unique_ptr< DebuggerStateInterface > *  debugger_state 
)
private

Definition at line 334 of file NTSession.cc.

References pfDeepBoostedJetPreprocessParams_cfi::input_names.

Referenced by CheckNotClosed(), and DecorateAndPublishGraphForDebug().

339  {
340  TF_RETURN_IF_ERROR(
341  DebuggerStateRegistry::CreateState(debug_options, debugger_state));
342  TF_RETURN_IF_ERROR(debugger_state->get()->PublishDebugMetadata(
343  debug_options.global_step(), session_run_index, executor_step_index,
344  input_names, output_names, target_names));
345  return Status::OK();
346 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::NTSession::CreateGraphs ( const BuildGraphOptions &  options,
std::unordered_map< string, std::unique_ptr< Graph >> *  outputs,
std::unique_ptr< FunctionLibraryDefinition > *  flib_def,
RunStateArgs run_state_args,
DataTypeVector *  input_types,
DataTypeVector *  output_types 
)
private

Definition at line 1220 of file NTSession.cc.

References KineDebug3::count(), edmIntegrityCheck::d, device_mgr_, device_set_, devices_, edge_name_counter_, flib_def_, tensorflow::NTSession::RunStateArgs::graph, graph_def_lock_, tensorflow::NTSession::RunStateArgs::is_partial_run, checklumidiff::l, eostools::move(), dataset::name, options_, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, tablePrinter::prefix, alignCSCRings::s, btagGenBb_cfi::Status, and std::swap().

Referenced by GetOrCreateExecutors().

1225  {
1226  mutex_lock l(graph_def_lock_);
1227  std::unique_ptr<ClientGraph> client_graph;
1228 
1229  std::unique_ptr<GraphExecutionState> temp_exec_state_holder;
1230  GraphExecutionState* execution_state = nullptr;
1231  if (options_.config.graph_options().place_pruned_graph()) {
1232  // Because we are placing pruned graphs, we need to create a
1233  // new GraphExecutionState for every new unseen graph,
1234  // and then place it.
1235  GraphExecutionStateOptions prune_options;
1236  prune_options.device_set = &device_set_;
1237  prune_options.session_options = &options_;
1238  prune_options.stateful_placements = stateful_placements_;
1239  TF_RETURN_IF_ERROR(GraphExecutionState::MakeForPrunedGraph(
1240  execution_state_->original_graph_def().library(), prune_options,
1241  execution_state_->original_graph_def(), subgraph_options,
1242  &temp_exec_state_holder, &client_graph));
1243  execution_state = temp_exec_state_holder.get();
1244  } else {
1245  execution_state = execution_state_.get();
1246  TF_RETURN_IF_ERROR(
1247  execution_state->BuildGraph(subgraph_options, &client_graph));
1248  }
1249 
1250  if (subgraph_options.feed_endpoints.size() !=
1251  client_graph->feed_types.size()) {
1252  return errors::Internal(
1253  "Graph pruning failed: requested number of feed endpoints = ",
1254  subgraph_options.feed_endpoints.size(),
1255  " versus number of pruned feed endpoints = ",
1256  client_graph->feed_types.size());
1257  }
1258  if (subgraph_options.fetch_endpoints.size() !=
1259  client_graph->fetch_types.size()) {
1260  return errors::Internal(
1261  "Graph pruning failed: requested number of fetch endpoints = ",
1262  subgraph_options.fetch_endpoints.size(),
1263  " versus number of pruned fetch endpoints = ",
1264  client_graph->fetch_types.size());
1265  }
1266 
1267  auto current_stateful_placements = execution_state->GetStatefulPlacements();
1268  // Update our current state based on the execution_state's
1269  // placements. If there are any mismatches for a node,
1270  // we should fail, as this should never happen.
1271  for (auto placement_pair : current_stateful_placements) {
1272  const string& node_name = placement_pair.first;
1273  const string& placement = placement_pair.second;
1274  auto iter = stateful_placements_.find(node_name);
1275  if (iter == stateful_placements_.end()) {
1276  stateful_placements_.insert(std::make_pair(node_name, placement));
1277  } else if (iter->second != placement) {
1278  return errors::Internal(
1279  "Stateful placement mismatch. "
1280  "Current assignment of ",
1281  node_name, " to ", iter->second, " does not match ", placement);
1282  }
1283  }
1284 
1285  stateful_placements_ = execution_state->GetStatefulPlacements();
1286 
1287  // Remember the graph in run state if this is a partial run.
1288  if (run_state_args->is_partial_run) {
1289  run_state_args->graph.reset(new Graph(flib_def_.get()));
1290  CopyGraph(*execution_state->full_graph(), run_state_args->graph.get());
1291  }
1292 
1293  // Partition the graph across devices.
1294  PartitionOptions popts;
1295  popts.node_to_loc = [](const Node* node) {
1296  assert(node != nullptr);
1297  return node->assigned_device_name();
1298  };
1299  popts.new_name = [this](const string& prefix) {
1300  return strings::StrCat(prefix, "/_", edge_name_counter_.fetch_add(1));
1301  };
1302  popts.get_incarnation = [](const string& name) {
1303  // The direct session does not have changing incarnation numbers.
1304  // Just return '1'.
1305  return 1;
1306  };
1307  popts.flib_def = &client_graph->graph.flib_def();
1308  popts.control_flow_added = false;
1309 
1310  std::unordered_map<string, GraphDef> partitions;
1311  TF_RETURN_IF_ERROR(Partition(popts, &client_graph->graph, &partitions));
1312 
1313  std::vector<string> device_names;
1314  for (auto device : devices_) {
1315  // Extract the LocalName from the device.
1316  device_names.push_back(DeviceNameUtils::LocalName(device->name()));
1317  }
1318 
1319  // Check for valid partitions.
1320  for (const auto& partition : partitions) {
1321  const string local_partition_name =
1322  DeviceNameUtils::LocalName(partition.first);
1323  if (std::count(device_names.begin(), device_names.end(),
1324  local_partition_name) == 0) {
1325  return errors::InvalidArgument(
1326  "Creating a partition for ", local_partition_name,
1327  " which doesn't exist in the list of available devices. Available "
1328  "devices: ",
1329  str_util::Join(device_names, ","));
1330  }
1331  }
1332 
1333  for (const auto& partition : partitions) {
1334  std::unique_ptr<Graph> device_graph(
1335  new Graph(client_graph->flib_def.get()));
1336  GraphConstructorOptions device_opts;
1337  // There are internal operations (e.g., send/recv) that we now allow.
1338  device_opts.allow_internal_ops = true;
1339  device_opts.expect_device_spec = true;
1340  TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(device_opts, partition.second,
1341  device_graph.get()));
1342  outputs->emplace(partition.first, std::move(device_graph));
1343  }
1344 
1345  GraphOptimizationPassOptions optimization_options;
1346  optimization_options.session_options = &options_;
1347  optimization_options.flib_def = client_graph->flib_def.get();
1348  optimization_options.partition_graphs = outputs;
1349  TF_RETURN_IF_ERROR(OptimizationPassRegistry::Global()->RunGrouping(
1350  OptimizationPassRegistry::POST_PARTITIONING, optimization_options));
1351 
1352  Status s;
1353  for (auto& partition : *outputs) {
1354  const string& partition_name = partition.first;
1355  std::unique_ptr<Graph>* graph = &partition.second;
1356 
1357  VLOG(2) << "Created " << DebugString(graph->get()) << " for "
1358  << partition_name;
1359 
1360  // Give the device an opportunity to rewrite its subgraph.
1361  Device* d;
1362  s = device_mgr_->LookupDevice(partition_name, &d);
1363  if (!s.ok()) break;
1364  s = d->MaybeRewriteGraph(graph);
1365  if (!s.ok()) {
1366  break;
1367  }
1368  }
1369  *flib_def = std::move(client_graph->flib_def);
1370  std::swap(*input_types, client_graph->feed_types);
1371  std::swap(*output_types, client_graph->fetch_types);
1372  return s;
1373 }
const SessionOptions options_
Definition: NTSession.h:287
DeviceSet device_set_
Definition: NTSession.h:292
Partition
Definition: HLTHPDFilter.cc:32
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
std::atomic< int64 > edge_name_counter_
Definition: NTSession.h:349
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::vector< Device * > devices_
Definition: NTSession.h:291
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:342
DDCompactView::Graph Graph
def move(src, dest)
Definition: eostools.py:511
Status tensorflow::NTSession::DecorateAndPublishGraphForDebug ( const DebugOptions &  debug_options,
Graph graph,
Device *  device 
)
private

Definition at line 348 of file NTSession.cc.

References createfilelist::args, EnergyCorrector::c, cancellation_manager_, CheckNotClosed(), cost_model_manager_, CreateDebuggerState(), device_mgr_, devices_, executor_lock_, tensorflow::NTSession::RunState::executors_done, tensorflow::NTSession::PerPartitionExecutorsAndLib::flib, GetOrCreateExecutors(), tensorflow::NTSession::PerPartitionExecutorsAndLib::graph, graph_def_lock_, mps_fire::i, tensorflow::NTSession::ExecutorsAndKeys::input_name_to_index, tensorflow::NTSession::ExecutorsAndKeys::input_types, PatBasicFWLiteJetAnalyzer_Selector_cfg::inputs, tensorflow::NTSession::ExecutorsAndKeys::items, checklumidiff::l, eostools::move(), operation_timeout_in_ms_, options_, tensorflow::NTSession::ExecutorsAndKeys::output_types, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, tensorflow::NTSession::RunState::rendez, ResourceHandleToInputTensor(), Run(), alignCSCRings::s, SchedClosure(), session_state_, btagGenBb_cfi::Status, tensorflow::NTSession::ExecutorsAndKeys::step_count, step_id_counter_, sync_on_finish_, and WaitForNotification().

Referenced by CheckNotClosed(), and GetOrCreateExecutors().

349  {
350  std::unique_ptr<DebugGraphDecoratorInterface> decorator;
351  TF_RETURN_IF_ERROR(
352  DebugGraphDecoratorRegistry::CreateDecorator(debug_options, &decorator));
353 
354  TF_RETURN_IF_ERROR(decorator->DecorateGraph(graph, device));
355  TF_RETURN_IF_ERROR(decorator->PublishGraph(*graph, device->name()));
356  return Status::OK();
357 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
void tensorflow::NTSession::ExportCostModels ( CostModelManager::CostModelMap *  cost_models)
inline

Definition at line 123 of file NTSession.h.

References cost_model_manager_.

123  {
124  cost_model_manager_.ExportCostModels(cost_models);
125  }
CostModelManager cost_model_manager_
Definition: NTSession.h:359
Status tensorflow::NTSession::Extend ( const GraphDef &  graph)
override

Definition at line 304 of file NTSession.cc.

References CheckNotClosed(), ExtendLocked(), graph_def_lock_, and checklumidiff::l.

304  {
305  TF_RETURN_IF_ERROR(CheckNotClosed());
306  mutex_lock l(graph_def_lock_);
307  return ExtendLocked(graph);
308 }
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:271
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:310
Status tensorflow::NTSession::ExtendLocked ( const GraphDef &  graph)
private

Definition at line 310 of file NTSession.cc.

References flib_def_, and MaybeInitializeExecutionState().

Referenced by Create(), and Extend().

310  {
311  bool already_initialized;
312  // If this is the first call, we can initialize the execution state
313  // with `graph` and do not need to call `Extend()`.
314  TF_RETURN_IF_ERROR(
315  MaybeInitializeExecutionState(graph, &already_initialized));
316  if (already_initialized) {
317  TF_RETURN_IF_ERROR(flib_def_->AddLibrary(graph.library()));
318  std::unique_ptr<GraphExecutionState> state;
319  TF_RETURN_IF_ERROR(execution_state_->Extend(graph, &state));
320  execution_state_.swap(state);
321  }
322  return Status::OK();
323 }
Status MaybeInitializeExecutionState(const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: NTSession.cc:260
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:342
Status tensorflow::NTSession::GetOrCreateExecutors ( gtl::ArraySlice< string >  inputs,
gtl::ArraySlice< string >  outputs,
gtl::ArraySlice< string >  target_nodes,
ExecutorsAndKeys **  executors_and_keys,
RunStateArgs run_state_args 
)
private

Definition at line 979 of file NTSession.cc.

References CreateGraphs(), tensorflow::NTSession::RunStateArgs::debug_options, DecorateAndPublishGraphForDebug(), device_mgr_, device_set_, executor_lock_, plotBeamSpotDB::first, tensorflow::NTSession::RunStateArgs::graph, graph_def_lock_, cuy::graphs, tensorflow::NTSession::RunStateArgs::handle, handle_name_counter_, mps_fire::i, triggerObjects_cff::id, input, tensorflow::NTSession::RunStateArgs::is_partial_run, crabWrapper::key, checklumidiff::l, mps_check::lib, eostools::move(), gen::n, names, node_outputs_callback_, AlcaSiPixelAliHarvester0T_cff::options, options_, convertSQLitetoXML_cfg::output, and session_handle_.

Referenced by DecorateAndPublishGraphForDebug(), and PRunSetup().

982  {
983  int64 handle_name_counter_value = -1;
984  if (LogMemory::IsEnabled() || run_state_args->is_partial_run) {
985  handle_name_counter_value = handle_name_counter_.fetch_add(1);
986  }
987 
988  string debug_tensor_watches_summary;
989  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
990  debug_tensor_watches_summary = SummarizeDebugTensorWatches(
991  run_state_args->debug_options.debug_tensor_watch_opts());
992  }
993 
994  // Fast lookup path, no sorting.
995  const string key = strings::StrCat(
996  str_util::Join(inputs, ","), "->", str_util::Join(outputs, ","), "/",
997  str_util::Join(target_nodes, ","), "/", run_state_args->is_partial_run,
998  "/", debug_tensor_watches_summary);
999  // Set the handle, if it's needed to log memory or for partial run.
1000  if (handle_name_counter_value >= 0) {
1001  run_state_args->handle =
1002  strings::StrCat(key, ";", handle_name_counter_value);
1003  }
1004 
1005  // See if we already have the executors for this run.
1006  {
1007  mutex_lock l(executor_lock_); // could use reader lock
1008  auto it = executors_.find(key);
1009  if (it != executors_.end()) {
1010  *executors_and_keys = it->second.get();
1011  return Status::OK();
1012  }
1013  }
1014 
1015  // Slow lookup path, the unsorted key missed the cache.
1016  // Sort the inputs and outputs, and look up with the sorted key in case an
1017  // earlier call used a different order of inputs and outputs.
1018  //
1019  // We could consider some other signature instead of sorting that
1020  // preserves the same property to avoid the sort in the future.
1021  std::vector<string> inputs_sorted(inputs.begin(), inputs.end());
1022  std::sort(inputs_sorted.begin(), inputs_sorted.end());
1023  std::vector<string> outputs_sorted(outputs.begin(), outputs.end());
1024  std::sort(outputs_sorted.begin(), outputs_sorted.end());
1025  std::vector<string> tn_sorted(target_nodes.begin(), target_nodes.end());
1026  std::sort(tn_sorted.begin(), tn_sorted.end());
1027 
1028  const string sorted_key = strings::StrCat(
1029  str_util::Join(inputs_sorted, ","), "->",
1030  str_util::Join(outputs_sorted, ","), "/", str_util::Join(tn_sorted, ","),
1031  "/", run_state_args->is_partial_run, "/", debug_tensor_watches_summary);
1032  // Set the handle, if its needed to log memory or for partial run.
1033  if (handle_name_counter_value >= 0) {
1034  run_state_args->handle =
1035  strings::StrCat(sorted_key, ";", handle_name_counter_value);
1036  }
1037 
1038  // See if we already have the executors for this run.
1039  {
1040  mutex_lock l(executor_lock_);
1041  auto it = executors_.find(sorted_key);
1042  if (it != executors_.end()) {
1043  *executors_and_keys = it->second.get();
1044  // Insert this under the original key.
1045  executors_.emplace(key, it->second);
1046  return Status::OK();
1047  }
1048  }
1049 
1050  // Nothing found, so create the executors and store in the cache.
1051  BuildGraphOptions options;
1052  options.feed_endpoints = inputs_sorted;
1053  options.fetch_endpoints = outputs_sorted;
1054  options.target_nodes = tn_sorted;
1055  options.use_function_convention = !run_state_args->is_partial_run;
1056  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
1057  options.debug_options = run_state_args->debug_options;
1058  }
1059 
1060  std::unique_ptr<FunctionInfo> func_info(new FunctionInfo);
1061  std::shared_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys);
1062 
1063  // The executor_lock_ is intentionally released while executor is
1064  // being created.
1065  std::unordered_map<string, std::unique_ptr<Graph>> graphs;
1066  TF_RETURN_IF_ERROR(CreateGraphs(options, &graphs, &func_info->flib_def,
1067  run_state_args, &ek->input_types,
1068  &ek->output_types));
1069 
1070  if (run_state_args->is_partial_run) {
1071  ek->graph = std::move(run_state_args->graph);
1072  std::unordered_set<StringPiece, StringPieceHasher> names;
1073  for (const string& input : inputs) {
1074  TensorId id(ParseTensorName(input));
1075  names.emplace(id.first);
1076  }
1077  for (const string& output : outputs) {
1078  TensorId id(ParseTensorName(output));
1079  names.emplace(id.first);
1080  }
1081  for (Node* n : ek->graph->nodes()) {
1082  if (names.count(n->name()) > 0) {
1083  ek->name_to_node.insert({n->name(), n});
1084  }
1085  }
1086  }
1087  ek->items.reserve(graphs.size());
1088  const auto& optimizer_opts =
1089  options_.config.graph_options().optimizer_options();
1090 
1091  int graph_def_version;
1092  {
1093  mutex_lock l(graph_def_lock_);
1094  graph_def_version =
1095  execution_state_->original_graph_def().versions().producer();
1096  }
1097  func_info->proc_flr.reset(new ProcessFunctionLibraryRuntime(
1098  device_mgr_.get(), options_.env, graph_def_version,
1099  func_info->flib_def.get(), optimizer_opts));
1100 
1101  GraphOptimizer optimizer(optimizer_opts);
1102  for (auto iter = graphs.begin(); iter != graphs.end(); ++iter) {
1103  const string& partition_name = iter->first;
1104  std::unique_ptr<Graph>& partition_graph = iter->second;
1105 
1106  Device* device;
1107  TF_RETURN_IF_ERROR(device_mgr_->LookupDevice(partition_name, &device));
1108 
1109  ek->items.resize(ek->items.size() + 1);
1110  auto* item = &(ek->items.back());
1111  auto lib = func_info->proc_flr->GetFLR(partition_name);
1112  if (lib == nullptr) {
1113  return errors::Internal("Could not find device: ", partition_name);
1114  }
1115  item->flib = lib;
1116 
1117  LocalExecutorParams params;
1118  params.device = device;
1119  params.function_library = lib;
1120  auto opseg = device->op_segment();
1121  params.create_kernel = [this, lib, opseg](const NodeDef& ndef,
1122  OpKernel** kernel) {
1123  // We do not share the kernel via the OpSegment if the node is
1124  // stateless, or a function.
1125  // NOTE(mrry): We must not share function kernels (implemented
1126  // using `CallOp`) between subgraphs, because `CallOp::handle_`
1127  // is tied to a particular subgraph. Even if the function itself
1128  // is stateful, the `CallOp` that invokes it is not.
1129  if (!lib->IsStateful(ndef.op()) ||
1130  lib->GetFunctionLibraryDefinition()->Find(ndef.op()) != nullptr) {
1131  return lib->CreateKernel(ndef, kernel);
1132  }
1133  auto create_fn = [lib, &ndef](OpKernel** kernel) {
1134  return lib->CreateKernel(ndef, kernel);
1135  };
1136  // Kernels created for subgraph nodes need to be cached. On
1137  // cache miss, create_fn() is invoked to create a kernel based
1138  // on the function library here + global op registry.
1139  return opseg->FindOrCreate(session_handle_, ndef.name(), kernel,
1140  create_fn);
1141  };
1142  params.delete_kernel = [lib](OpKernel* kernel) {
1143  // If the node is stateful, opseg owns it. Otherwise, delete it.
1144  if (kernel && !lib->IsStateful(kernel->type_string())) {
1145  delete kernel;
1146  }
1147  };
1148  params.node_outputs_cb = node_outputs_callback_;
1149 
1150  optimizer.Optimize(lib, options_.env, device, &iter->second,
1151  /*shape_map=*/nullptr);
1152 
1153  // EXPERIMENTAL: tfdbg inserts debug nodes in the graph.
1154  if (!options.debug_options.debug_tensor_watch_opts().empty()) {
1155  TF_RETURN_IF_ERROR(DecorateAndPublishGraphForDebug(
1156  options.debug_options, partition_graph.get(), params.device));
1157  }
1158 
1159  TF_RETURN_IF_ERROR(EnsureMemoryTypes(DeviceType(device->device_type()),
1160  device->name(),
1161  partition_graph.get()));
1162  // NewLocalExecutor takes ownership of partition_graph.
1163  item->graph = partition_graph.get();
1164  item->executor = nullptr;
1165  item->device = device;
1166  Executor* executor;
1167  TF_RETURN_IF_ERROR(
1168  NewLocalExecutor(params, partition_graph.release(), &executor));
1169  item->executor.reset(executor);
1170  }
1171 
1172  // Cache the mapping from input/output names to graph elements to
1173  // avoid recomputing it every time.
1174  if (!run_state_args->is_partial_run) {
1175  // For regular `Run()`, we use the function calling convention, and so
1176  // maintain a mapping from input/output names to
1177  // argument/return-value ordinal index.
1178  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
1179  const string& input = inputs_sorted[i];
1180  ek->input_name_to_index[input] = i;
1181  }
1182  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
1183  const string& output = outputs_sorted[i];
1184  ek->output_name_to_index[output] = i;
1185  }
1186  } else {
1187  // For `PRun()`, we use the rendezvous calling convention, and so
1188  // maintain a mapping from input/output names to rendezvous keys.
1189  //
1190  // We always use the first device as the device name portion of the
1191  // key, even if we're feeding another graph.
1192  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
1193  const string& input = inputs_sorted[i];
1194  ek->input_name_to_rendezvous_key[input] = GetRendezvousKey(
1195  input, device_set_.client_device()->attributes(), FrameAndIter(0, 0));
1196  }
1197  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
1198  const string& output = outputs_sorted[i];
1199  ek->output_name_to_rendezvous_key[output] =
1200  GetRendezvousKey(output, device_set_.client_device()->attributes(),
1201  FrameAndIter(0, 0));
1202  }
1203  }
1204 
1205  // Reacquire the lock, try to insert into the map.
1206  mutex_lock l(executor_lock_);
1207  functions_.push_back(std::move(func_info));
1208 
1209  // Another thread may have created the entry before us, in which case we will
1210  // reuse the already created one.
1211  auto insert_result = executors_.emplace(sorted_key, ek);
1212  // Insert the value under the original key, so the fast path lookup will work
1213  // if the user uses the same order of inputs, outputs, and targets again.
1214  executors_.emplace(key, insert_result.first->second);
1215  *executors_and_keys = insert_result.first->second.get();
1216 
1217  return Status::OK();
1218 }
const SessionOptions options_
Definition: NTSession.h:287
::tensorflow::Status DecorateAndPublishGraphForDebug(const DebugOptions &debug_options, Graph *graph, Device *device)
Definition: NTSession.cc:348
const std::string names[nVars_]
static std::string const input
Definition: EdmProvDump.cc:45
DeviceSet device_set_
Definition: NTSession.h:292
std::atomic< int64 > handle_name_counter_
Definition: NTSession.h:350
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
::tensorflow::Status CreateGraphs(const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
Definition: NTSession.cc:1220
Executor::Args::NodeOutputsCallback node_outputs_callback_
Definition: NTSession.h:361
graphs
Definition: cuy.py:962
def move(src, dest)
Definition: eostools.py:511
bool graph_created_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
GraphDef graph_def_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
std::vector<std::unique_ptr<FunctionInfo> > functions_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, std::shared_ptr<ExecutorsAndKeys> > executors_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, std::unique_ptr<RunState> > partial_runs_ tensorflow::NTSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, string> stateful_placements_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
std::unique_ptr<GraphExecutionState> execution_state_ tensorflow::NTSession::GUARDED_BY ( graph_def_lock_  )
private
bool closed_ tensorflow::NTSession::GUARDED_BY ( closed_lock_  )
private
tensorflow::Status tensorflow::NTSession::ListDevices ( std::vector< DeviceAttributes > *  response)
override

Definition at line 1375 of file NTSession.cc.

References edmIntegrityCheck::d, and devices_.

1376  {
1377  response->clear();
1378  response->reserve(devices_.size());
1379  for (Device* d : devices_) {
1380  const DeviceAttributes& attrs = d->attributes();
1381  response->emplace_back(attrs);
1382  }
1384 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::vector< Device * > devices_
Definition: NTSession.h:291
::tensorflow::Status tensorflow::NTSession::LocalDeviceManager ( const DeviceMgr **  output)
inlineoverride

Definition at line 118 of file NTSession.h.

References device_mgr_.

118  {
119  *output = device_mgr_.get();
121  }
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::NTSession::MaybeInitializeExecutionState ( const GraphDef &  graph,
bool *  out_already_initialized 
)
private

Definition at line 260 of file NTSession.cc.

References device_set_, flib_def_, AlcaSiPixelAliHarvester0T_cff::options, options_, and groupFilesInBlocks::temp.

Referenced by ExtendLocked().

261  {
262  // If already initialized, do nothing.
263  if (flib_def_ && execution_state_) {
264  *out_already_initialized = true;
265  return Status::OK();
266  }
267  // Set up the per-session execution state.
268  // NOTE(mrry): The function library created here will be used for
269  // all subsequent extensions of the graph.
270  flib_def_.reset(
271  new FunctionLibraryDefinition(OpRegistry::Global(), graph.library()));
272  GraphExecutionStateOptions options;
273  options.device_set = &device_set_;
274  options.session_options = &options_;
275  // TODO(mrry,suharshs): We explicitly copy `graph` so that
276  // `MakeForBaseGraph()` can take ownership of its
277  // contents. Previously this happened implicitly in calls to the
278  // `GraphExecutionState`. Other sessions call
279  // `MakeForBaseGraph` in such a way that we can destructively read
280  // the passed-in `GraphDef`. In principle we could do the same here,
281  // with a wider refactoring; we might revise the direct session so
282  // that it copies the graph fewer times.
283  GraphDef temp(graph);
284  TF_RETURN_IF_ERROR(
285  GraphExecutionState::MakeForBaseGraph(&temp, options, &execution_state_));
286  graph_created_ = true;
287  *out_already_initialized = false;
288  return Status::OK();
289 }
const SessionOptions options_
Definition: NTSession.h:287
DeviceSet device_set_
Definition: NTSession.h:292
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: NTSession.h:342
Status tensorflow::NTSession::PRun ( const string &  handle,
const NamedTensorList inputs,
const std::vector< string > &  output_names,
std::vector< Tensor > *  outputs 
)
override

Definition at line 707 of file NTSession.cc.

References cancellation_manager_, CheckFetch(), CheckNotClosed(), executor_lock_, input, crabWrapper::key, checklumidiff::l, LOG, tensorflow::NTSession::RunState::mu_, dataset::name, operation_timeout_in_ms_, convertSQLitetoXML_cfg::output, CfgNavigationSchool_cfi::parts, tensorflow::NTSession::RunState::pending_inputs, tensorflow::NTSession::RunState::pending_outputs, tensorflow::NTSession::RunState::PendingDone(), RecvPRunOutputs(), tensorflow::NTSession::RunState::rendez, alignCSCRings::s, SendPRunInputs(), session_state_, btagGenBb_cfi::Status, tensorflow::NTSession::RunState::tensor_store, WaitForNotification(), and dqm::qstatus::WARNING.

709  {
710  TF_RETURN_IF_ERROR(CheckNotClosed());
711  std::vector<string> parts = str_util::Split(handle, ';');
712  const string& key = parts[0];
713  // Get the executors for this partial run.
714  ExecutorsAndKeys* executors_and_keys;
715  RunState* run_state;
716  {
717  mutex_lock l(executor_lock_); // could use reader lock
718  auto exc_it = executors_.find(key);
719  if (exc_it == executors_.end()) {
720  return errors::InvalidArgument(
721  "Must run 'setup' before performing partial runs!");
722  }
723  executors_and_keys = exc_it->second.get();
724 
725  auto prun_it = partial_runs_.find(handle);
726  if (prun_it == partial_runs_.end()) {
727  return errors::InvalidArgument(
728  "Must run 'setup' before performing partial runs!");
729  }
730  run_state = prun_it->second.get();
731 
732  // Make sure that this is a new set of feeds that are still pending.
733  for (const auto& input : inputs) {
734  auto it = run_state->pending_inputs.find(input.first);
735  if (it == run_state->pending_inputs.end()) {
736  return errors::InvalidArgument(
737  "The feed ", input.first,
738  " was not specified in partial_run_setup.");
739  } else if (it->second) {
740  return errors::InvalidArgument("The feed ", input.first,
741  " has already been fed.");
742  }
743  }
744  // Check that this is a new set of fetches that are still pending.
745  for (const auto& output : output_names) {
746  auto it = run_state->pending_outputs.find(output);
747  if (it == run_state->pending_outputs.end()) {
748  return errors::InvalidArgument(
749  "The fetch ", output, " was not specified in partial_run_setup.");
750  } else if (it->second) {
751  return errors::InvalidArgument("The fetch ", output,
752  " has already been fetched.");
753  }
754  }
755  }
756 
757  // Check that this new set of fetches can be computed from all the
758  // feeds we have supplied.
759  TF_RETURN_IF_ERROR(
760  CheckFetch(inputs, output_names, executors_and_keys, run_state));
761 
762  // Send inputs.
763  Status s = SendPRunInputs(inputs, executors_and_keys, run_state->rendez);
764 
765  // Receive outputs.
766  if (s.ok()) {
767  s = RecvPRunOutputs(output_names, executors_and_keys, run_state, outputs);
768  }
769 
770  // Save the output tensors of this run we choose to keep.
771  if (s.ok()) {
772  s = run_state->tensor_store.SaveTensors(output_names, &session_state_);
773  }
774 
775  {
776  mutex_lock l(executor_lock_);
777  // Delete the run state if there is an error or all fetches are done.
778  bool done = true;
779  if (s.ok()) {
780  {
781  mutex_lock l(run_state->mu_);
782  if (!run_state->status.ok()) {
783  LOG(WARNING) << "An error unrelated to this prun has been detected. "
784  << run_state->status;
785  }
786  }
787  for (const auto& input : inputs) {
788  auto it = run_state->pending_inputs.find(input.first);
789  it->second = true;
790  }
791  for (const auto& name : output_names) {
792  auto it = run_state->pending_outputs.find(name);
793  it->second = true;
794  }
795  done = run_state->PendingDone();
796  }
797  if (done) {
800  partial_runs_.erase(handle);
801  }
802  }
803 
804  return s;
805 }
static const int WARNING
SessionState session_state_
Definition: NTSession.h:323
#define LOG(A)
::tensorflow::Status SendPRunInputs(const std::vector< std::pair< string, Tensor >> &inputs, const ExecutorsAndKeys *executors_and_keys, IntraProcessRendezvous *rendez)
Definition: NTSession.cc:833
static std::string const input
Definition: EdmProvDump.cc:45
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:271
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: NTSession.cc:1465
::tensorflow::Status CheckFetch(const std::vector< std::pair< string, Tensor >> &feeds, const std::vector< string > &fetches, const ExecutorsAndKeys *executors_and_keys, const RunState *run_state)
Definition: NTSession.cc:918
::tensorflow::Status RecvPRunOutputs(const std::vector< string > &output_names, const ExecutorsAndKeys *executors_and_keys, RunState *run_state, std::vector< Tensor > *outputs)
Definition: NTSession.cc:872
CancellationManager * cancellation_manager_
Definition: NTSession.h:326
const int64 operation_timeout_in_ms_
Definition: NTSession.h:356
Status tensorflow::NTSession::PRunSetup ( const std::vector< string > &  input_names,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
string *  handle 
)
override

Definition at line 630 of file NTSession.cc.

References createfilelist::args, EnergyCorrector::c, cancellation_manager_, CheckNotClosed(), device_mgr_, devices_, executor_lock_, tensorflow::NTSession::RunState::executors_done, GetOrCreateExecutors(), graph_def_lock_, tensorflow::NTSession::RunStateArgs::handle, cmsBatch::handle, tensorflow::NTSession::RunStateArgs::is_partial_run, tensorflow::NTSession::ExecutorsAndKeys::items, checklumidiff::l, eostools::move(), options_, tensorflow::NTSession::RunState::rendez, SchedClosure(), session_state_, btagGenBb_cfi::Status, step_id_counter_, and sync_on_finish_.

633  {
634  TF_RETURN_IF_ERROR(CheckNotClosed());
635  {
636  mutex_lock l(graph_def_lock_);
637  if (!graph_created_) {
638  return errors::InvalidArgument(
639  "Session was not created with a graph before PRunSetup()!");
640  }
641  }
642 
643  // Check if we already have an executor for these arguments.
644  ExecutorsAndKeys* executors_and_keys;
645  // TODO(cais): TFDBG support for partial runs.
646  DebugOptions debug_options;
647  RunStateArgs run_state_args(debug_options);
648  run_state_args.is_partial_run = true;
649  TF_RETURN_IF_ERROR(GetOrCreateExecutors(input_names, output_names,
650  target_nodes, &executors_and_keys,
651  &run_state_args));
652 
653  // Create the run state and save it for future PRun calls.
654  Executor::Args args;
655  args.step_id = step_id_counter_.fetch_add(1);
656  RunState* run_state =
657  new RunState(input_names, output_names, args.step_id, &devices_);
658  run_state->rendez = new IntraProcessRendezvous(device_mgr_.get());
659  {
660  mutex_lock l(executor_lock_);
661  if (!partial_runs_
662  .emplace(run_state_args.handle,
663  std::unique_ptr<RunState>(run_state))
664  .second) {
665  return errors::Internal("The handle '", run_state_args.handle,
666  "' created for this partial run is not unique.");
667  }
668  }
669 
670  // Start parallel Executors.
671  const size_t num_executors = executors_and_keys->items.size();
672  ExecutorBarrier* barrier = new ExecutorBarrier(
673  num_executors, run_state->rendez, [run_state](const Status& ret) {
674  if (!ret.ok()) {
675  mutex_lock l(run_state->mu_);
676  run_state->status.Update(ret);
677  }
678  run_state->executors_done.Notify();
679  });
680 
681  args.rendezvous = run_state->rendez;
682  args.cancellation_manager = cancellation_manager_;
683  args.runner = [this](Executor::Args::Closure c) {
685  };
686  args.session_state = &session_state_;
687  args.tensor_store = &run_state->tensor_store;
688  args.step_container = &run_state->step_container;
689  if (LogMemory::IsEnabled()) {
690  LogMemory::RecordStep(args.step_id, run_state_args.handle);
691  }
692  args.sync_on_finish = sync_on_finish_;
693 
694  if (options_.config.graph_options().build_cost_model()) {
695  run_state->collector.reset(new StepStatsCollector(nullptr));
696  args.stats_collector = run_state->collector.get();
697  }
698 
699  for (auto& item : executors_and_keys->items) {
700  item.executor->RunAsync(args, barrier->Get());
701  }
702 
703  *handle = run_state_args.handle;
704  return Status::OK();
705 }
static std::atomic_int_fast64_t step_id_counter_
Definition: NTSession.h:353
const SessionOptions options_
Definition: NTSession.h:287
SessionState session_state_
Definition: NTSession.h:323
::tensorflow::Status CheckNotClosed()
Definition: NTSession.h:271
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::vector< Device * > devices_
Definition: NTSession.h:291
void SchedClosure(std::function< void()> c)
Definition: NTSession.cc:192
::tensorflow::Status GetOrCreateExecutors(gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
Definition: NTSession.cc:979
CancellationManager * cancellation_manager_
Definition: NTSession.h:326
def move(src, dest)
Definition: eostools.py:511
Status tensorflow::NTSession::RecvPRunOutputs ( const std::vector< string > &  output_names,
const ExecutorsAndKeys executors_and_keys,
RunState run_state,
std::vector< Tensor > *  outputs 
)
private

Definition at line 872 of file NTSession.cc.

References operation_timeout_in_ms_, tensorflow::NTSession::ExecutorsAndKeys::output_name_to_rendezvous_key, tensorflow::NTSession::RunState::rendez, alignCSCRings::s, and btagGenBb_cfi::Status.

Referenced by PRun().

875  {
876  Status s;
877  if (!output_names.empty()) {
878  outputs->resize(output_names.size());
879  }
880 
881  Rendezvous::ParsedKey parsed;
882  // Get the outputs from the rendezvous
883  for (size_t output_offset = 0; output_offset < output_names.size();
884  ++output_offset) {
885  const string& output_name = output_names[output_offset];
886  auto it =
887  executors_and_keys->output_name_to_rendezvous_key.find(output_name);
888  if (it == executors_and_keys->output_name_to_rendezvous_key.end()) {
889  return errors::Internal("'", output_name,
890  "' is not a pre-defined fetch.");
891  }
892  const string& output_key = it->second;
893  Tensor output_tensor;
894  bool is_dead;
895  IntraProcessRendezvous* rendez = run_state->rendez;
896 
897  s = Rendezvous::ParseKey(output_key, &parsed);
898  if (s.ok()) {
899  // Fetch data from the Rendezvous.
900  s = rendez->Recv(parsed, Rendezvous::Args(), &output_tensor, &is_dead,
902  if (is_dead && s.ok()) {
903  s = errors::InvalidArgument("The tensor returned for ", output_name,
904  " was not valid.");
905  }
906  }
907  if (!s.ok()) {
908  rendez->StartAbort(s);
909  outputs->clear();
910  return s;
911  }
912 
913  (*outputs)[output_offset] = output_tensor;
914  }
915  return Status::OK();
916 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
const int64 operation_timeout_in_ms_
Definition: NTSession.h:356
tensorflow::Status tensorflow::NTSession::Reset ( const std::vector< string > &  containers)

Definition at line 1386 of file NTSession.cc.

References device_mgr_.

1387  {
1388  device_mgr_->ClearContainers(containers);
1390 }
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: NTSession.h:290
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::NTSession::ResourceHandleToInputTensor ( const Tensor &  resource_tensor,
Tensor *  retrieved_tensor 
)
private

Definition at line 807 of file NTSession.cc.

References session_state_.

Referenced by DecorateAndPublishGraphForDebug(), and SendPRunInputs().

808  {
809  if (resource_tensor.dtype() != DT_RESOURCE) {
810  return errors::InvalidArgument(strings::StrCat(
811  "ResourceHandleToInputTensor() received non-DT_RESOURCE Tensor: ",
812  resource_tensor.dtype()));
813  }
814 
815  const ResourceHandle& resource_handle =
816  resource_tensor.scalar<ResourceHandle>()();
817 
818  if (resource_handle.container() ==
819  SessionState::kTensorHandleResourceTypeName) {
820  return session_state_.GetTensor(resource_handle.name(), retrieved_tensor);
821  } else {
822  return errors::InvalidArgument(strings::StrCat(
823  "Invalid resource type hash code: ", resource_handle.hash_code(),
824  "(name: ", resource_handle.name(),
825  " type: ", resource_handle.maybe_type_name(),
826  "). Perhaps a resource tensor was being provided as a feed? That is "
827  "not currently allowed. Please file an issue at "
828  "https://github.com/tensorflow/tensorflow/issues/new, ideally with a "
829  "short code snippet that leads to this error message."));
830  }
831 }
SessionState session_state_
Definition: NTSession.h:323
Status tensorflow::NTSession::Run ( const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs 
)
override

Definition at line 325 of file NTSession.cc.

Referenced by DecorateAndPublishGraphForDebug().

328  {
329  RunMetadata run_metadata;
330  return Run(RunOptions(), inputs, output_names, target_nodes, outputs,
331  &run_metadata);
332 }
::tensorflow::Status Run(const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
Definition: NTSession.cc:325
::tensorflow::Status tensorflow::NTSession::Run ( const ::tensorflow::RunOptions &  run_options,
const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs,
RunMetadata *  run_metadata 
)
override
void tensorflow::NTSession::SchedClosure ( std::function< void()>  c)
private

Definition at line 192 of file NTSession.cc.

References EnergyCorrector::c.

Referenced by DecorateAndPublishGraphForDebug(), and PRunSetup().

192  {
193  c();
194 }
Status tensorflow::NTSession::SendPRunInputs ( const std::vector< std::pair< string, Tensor >> &  inputs,
const ExecutorsAndKeys executors_and_keys,
IntraProcessRendezvous *  rendez 
)
private

Definition at line 833 of file NTSession.cc.

References input, tensorflow::NTSession::ExecutorsAndKeys::input_name_to_rendezvous_key, ResourceHandleToInputTensor(), alignCSCRings::s, and btagGenBb_cfi::Status.

Referenced by PRun().

835  {
836  Status s;
837  Rendezvous::ParsedKey parsed;
838  // Insert the input tensors into the local rendezvous by their
839  // rendezvous key.
840  for (const auto& input : inputs) {
841  auto it =
842  executors_and_keys->input_name_to_rendezvous_key.find(input.first);
843  if (it == executors_and_keys->input_name_to_rendezvous_key.end()) {
844  return errors::Internal("'", input.first, "' is not a pre-defined feed.");
845  }
846  const string& input_key = it->second;
847 
848  s = Rendezvous::ParseKey(input_key, &parsed);
849  if (!s.ok()) {
850  rendez->StartAbort(s);
851  return s;
852  }
853 
854  if (input.second.dtype() == DT_RESOURCE) {
855  Tensor tensor_from_handle;
856  s = ResourceHandleToInputTensor(input.second, &tensor_from_handle);
857  if (s.ok()) {
858  s = rendez->Send(parsed, Rendezvous::Args(), tensor_from_handle, false);
859  }
860  } else {
861  s = rendez->Send(parsed, Rendezvous::Args(), input.second, false);
862  }
863 
864  if (!s.ok()) {
865  rendez->StartAbort(s);
866  return s;
867  }
868  }
869  return Status::OK();
870 }
::tensorflow::Status ResourceHandleToInputTensor(const Tensor &resource_tensor, Tensor *retrieved_tensor)
Definition: NTSession.cc:807
static std::string const input
Definition: EdmProvDump.cc:45
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
tensorflow::NTSession::TF_DISALLOW_COPY_AND_ASSIGN ( NTSession  )
private
tensorflow::Status tensorflow::NTSession::WaitForNotification ( Notification *  n,
int64  timeout_in_ms 
)
private

Definition at line 1465 of file NTSession.cc.

References btagGenBb_cfi::Status.

Referenced by DecorateAndPublishGraphForDebug(), PRun(), and WaitForNotification().

1466  {
1467  if (timeout_in_ms > 0) {
1468  const int64 timeout_in_us = timeout_in_ms * 1000;
1469  const bool notified =
1470  WaitForNotificationWithTimeout(notification, timeout_in_us);
1471  if (!notified) {
1472  return Status(error::DEADLINE_EXCEEDED,
1473  "Timed out waiting for notification");
1474  }
1475  } else {
1476  notification->WaitForNotification();
1477  }
1478  return Status::OK();
1479 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
void tensorflow::NTSession::WaitForNotification ( RunState run_state,
CancellationManager *  cm,
int64  timeout_in_ms 
)
private

Definition at line 1447 of file NTSession.cc.

References tensorflow::NTSession::RunState::executors_done, checklumidiff::l, tensorflow::NTSession::RunState::mu_, btagGenBb_cfi::Status, mps_update::status, and WaitForNotification().

1449  {
1450  const Status status =
1451  WaitForNotification(&run_state->executors_done, timeout_in_ms);
1452  if (!status.ok()) {
1453  {
1454  mutex_lock l(run_state->mu_);
1455  run_state->status.Update(status);
1456  }
1457  cm->StartCancel();
1458  // We must wait for the executors to complete, because they have borrowed
1459  // references to `cm` and other per-step state. After this notification, it
1460  // is safe to clean up the step.
1461  run_state->executors_done.WaitForNotification();
1462  }
1463 }
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: NTSession.cc:1465

Friends And Related Function Documentation

friend class DebugGateway
friend

Definition at line 366 of file NTSession.h.

Member Data Documentation

CancellationManager* tensorflow::NTSession::cancellation_manager_
private

Definition at line 326 of file NTSession.h.

Referenced by Close(), DecorateAndPublishGraphForDebug(), PRun(), PRunSetup(), and ~NTSession().

mutex tensorflow::NTSession::closed_lock_
private

Definition at line 345 of file NTSession.h.

Referenced by CheckNotClosed(), and Close().

CostModelManager tensorflow::NTSession::cost_model_manager_
private

Definition at line 359 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and ExportCostModels().

const std::unique_ptr<const DeviceMgr> tensorflow::NTSession::device_mgr_
private
DeviceSet tensorflow::NTSession::device_set_
private
std::vector<Device*> tensorflow::NTSession::devices_
private
std::atomic<int64> tensorflow::NTSession::edge_name_counter_ = {0}
private

Definition at line 349 of file NTSession.h.

Referenced by CreateGraphs().

mutex tensorflow::NTSession::executor_lock_
private
NTSessionFactory* const tensorflow::NTSession::factory_
private

Definition at line 325 of file NTSession.h.

Referenced by Close().

std::unique_ptr<FunctionLibraryDefinition> tensorflow::NTSession::flib_def_
private

Definition at line 342 of file NTSession.h.

Referenced by CreateGraphs(), ExtendLocked(), MaybeInitializeExecutionState(), and ~NTSession().

mutex tensorflow::NTSession::graph_def_lock_
private
std::atomic<int64> tensorflow::NTSession::handle_name_counter_ = {0}
private

Definition at line 350 of file NTSession.h.

Referenced by GetOrCreateExecutors().

Status tensorflow::NTSession::init_error_
private

Definition at line 300 of file NTSession.h.

Referenced by Create().

Executor::Args::NodeOutputsCallback tensorflow::NTSession::node_outputs_callback_ = nullptr
private

Definition at line 361 of file NTSession.h.

Referenced by GetOrCreateExecutors().

const int64 tensorflow::NTSession::operation_timeout_in_ms_ = 0
private

Definition at line 356 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), PRun(), and RecvPRunOutputs().

const SessionOptions tensorflow::NTSession::options_
private
string tensorflow::NTSession::session_handle_
private

Definition at line 294 of file NTSession.h.

Referenced by GetOrCreateExecutors(), NTSession(), and ~NTSession().

SessionState tensorflow::NTSession::session_state_
private
std::atomic_int_fast64_t tensorflow::NTSession::step_id_counter_
staticprivate

Definition at line 353 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and PRunSetup().

bool tensorflow::NTSession::sync_on_finish_ = true
private

Definition at line 303 of file NTSession.h.

Referenced by DecorateAndPublishGraphForDebug(), NTSession(), and PRunSetup().