CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | Friends
tensorflow::TBBSession Class Reference

#include <TBBSession.h>

Inheritance diagram for tensorflow::TBBSession:
Session

Classes

struct  ExecutorsAndKeys
 
struct  PerPartitionExecutorsAndLib
 
struct  RunState
 
struct  RunStateArgs
 

Public Types

typedef std::function< void(Session *)> CloseCallback
 
typedef std::vector< std::pair< string, Tensor > > NamedTensorList
 
typedef std::unordered_map< StringPiece, Node *, StringPieceHasher > NameNodeMap
 

Public Member Functions

::tensorflow::Status Close () override
 
::tensorflow::Status Create (const GraphDef &graph) override
 
void ExportCostModels (CostModelManager::CostModelMap *cost_models)
 
::tensorflow::Status Extend (const GraphDef &graph) override
 
::tensorflow::Status ListDevices (std::vector< DeviceAttributes > *response) override
 
::tensorflow::Status LocalDeviceManager (const DeviceMgr **output) override
 
::tensorflow::Status Reset (const std::vector< string > &containers)
 
::tensorflow::Status Run (const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
 
::tensorflow::Status Run (const ::tensorflow::RunOptions &run_options, const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs, RunMetadata *run_metadata) override
 
 TBBSession (const SessionOptions &options, const DeviceMgr *device_mgr, TBBSessionFactory *factory)
 
 ~TBBSession () override
 

Private Member Functions

::tensorflow::Status CheckNotClosed ()
 
::tensorflow::Status CreateDebuggerState (const DebugOptions &debug_options, int64 session_run_index, int64 executor_step_index, const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_names, std::unique_ptr< DebuggerStateInterface > *debugger_state)
 
::tensorflow::Status CreateGraphs (const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
 
::tensorflow::Status DecorateAndPublishGraphForDebug (const DebugOptions &debug_options, Graph *graph, Device *device)
 
::tensorflow::Status ExtendLocked (const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status GetOrCreateExecutors (gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
 
bool graph_created_ GUARDED_BY (graph_def_lock_)
 
GraphDef graph_def_ GUARDED_BY (graph_def_lock_)
 
std::unordered_map< string, std::shared_ptr< ExecutorsAndKeys > > executors_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, std::unique_ptr< RunState > > partial_runs_ GUARDED_BY (executor_lock_)
 
std::unordered_map< string, string > stateful_placements_ GUARDED_BY (graph_def_lock_)
 
std::unique_ptr< GraphExecutionState > execution_state_ GUARDED_BY (graph_def_lock_)
 
bool closed_ GUARDED_BY (closed_lock_)
 
Status MaybeInitializeExecutionState (const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
 
::tensorflow::Status ResourceHandleToInputTensor (const Tensor &resource_tensor, Tensor *retrieved_tensor)
 
void SchedClosure (tbb::task_arena &arena, tbb::task_group &g, std::function< void()> c)
 
 TF_DISALLOW_COPY_AND_ASSIGN (TBBSession)
 
::tensorflow::Status WaitForNotification (Notification *n, int64 timeout_in_ms)
 
void WaitForNotification (tbb::task_arena &arena, tbb::task_group &group, RunState *run_state, CancellationManager *cm, int64 timeout_in_ms)
 

Private Attributes

CancellationManager * cancellation_manager_
 
mutex closed_lock_
 
CostModelManager cost_model_manager_
 
const std::unique_ptr< const DeviceMgr > device_mgr_
 
DeviceSet device_set_
 
std::vector< Device * > devices_
 
std::atomic< int64 > edge_name_counter_ = {0}
 
mutex executor_lock_
 
TBBSessionFactory *const factory_
 
std::unique_ptr< FunctionLibraryDefinition > flib_def_
 
mutex graph_def_lock_
 
std::atomic< int64 > handle_name_counter_ = {0}
 
Status init_error_
 
Executor::Args::NodeOutputsCallback node_outputs_callback_ = nullptr
 
const int64 operation_timeout_in_ms_ = 0
 
const SessionOptions options_
 
string session_handle_
 
SessionState session_state_
 
bool sync_on_finish_ = true
 

Static Private Attributes

static std::atomic_int_fast64_t step_id_counter_
 

Friends

class DebugGateway
 

Detailed Description

Definition at line 81 of file TBBSession.h.

Member Typedef Documentation

typedef std::function<void(Session*)> tensorflow::TBBSession::CloseCallback

Definition at line 83 of file TBBSession.h.

typedef std::vector<std::pair<string, Tensor> > tensorflow::TBBSession::NamedTensorList

Definition at line 93 of file TBBSession.h.

typedef std::unordered_map<StringPiece, Node*, StringPieceHasher> tensorflow::TBBSession::NameNodeMap

Definition at line 94 of file TBBSession.h.

Constructor & Destructor Documentation

tensorflow::TBBSession::TBBSession ( const SessionOptions &  options,
const DeviceMgr *  device_mgr,
TBBSessionFactory factory 
)

Definition at line 198 of file TBBSession.cc.

References edmIntegrityCheck::d, device_mgr_, device_set_, devices_, dqm::qstatus::ERROR, MessageLogger_cfi::INFO, LOG, session_handle_, btagGenBb_cfi::Status, mps_update::status, and sync_on_finish_.

201  : options_(options),
202  device_mgr_(device_mgr),
203  factory_(factory),
204  cancellation_manager_(new CancellationManager()),
205  operation_timeout_in_ms_(options_.config.operation_timeout_in_ms()) {
206  // The default value of sync_on_finish will be flipped soon and this
207  // environment variable will be removed as well.
208  const Status status =
209  ReadBoolFromEnvVar("TF_SYNC_ON_FINISH", true, &sync_on_finish_);
210  if (!status.ok()) {
211  LOG(ERROR) << status.error_message();
212  }
213  // NOTE(mrry): We do not need to use a unique string for the session
214  // handle, because TBBSession owns its devices. This may change
215  // in future versions.
216  session_handle_ = "tbb";
217  int devices_added = 0;
218  if (options.config.log_device_placement()) {
219  const string mapping_str = device_mgr_->DeviceMappingString();
220  if (mapping_str.empty()) {
221  printf("Device mapping: no known devices.\n");
222  } else {
223  printf("Device mapping:\n%s", mapping_str.c_str());
224  }
225  LOG(INFO) << "Device mapping:\n" << mapping_str;
226  }
227  for (auto d : device_mgr_->ListDevices()) {
228  devices_.push_back(d);
229  device_set_.AddDevice(d);
230  d->op_segment()->AddHold(session_handle_);
231 
232  // The first device added is special: it is the 'client device' (a
233  // CPU device) from which we feed and fetch Tensors.
234  if (devices_added == 0) {
235  device_set_.set_client_device(d);
236  }
237  ++devices_added;
238  }
239 }
std::vector< Device * > devices_
Definition: TBBSession.h:263
#define LOG(A)
CancellationManager * cancellation_manager_
Definition: TBBSession.h:295
const SessionOptions options_
Definition: TBBSession.h:259
TBBSessionFactory *const factory_
Definition: TBBSession.h:294
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
const int64 operation_timeout_in_ms_
Definition: TBBSession.h:325
static const int ERROR
tensorflow::TBBSession::~TBBSession ( )
override

Definition at line 241 of file TBBSession.cc.

References cancellation_manager_, Close(), edmIntegrityCheck::d, device_mgr_, flib_def_, and session_handle_.

241  {
242  if (!closed_) Close().IgnoreError();
243  for (auto& it : partial_runs_) {
244  it.second.reset(nullptr);
245  }
246  for (auto& it : executors_) {
247  it.second.reset();
248  }
249  for (auto d : device_mgr_->ListDevices()) {
250  d->op_segment()->RemoveHold(session_handle_);
251  }
252  delete cancellation_manager_;
253 
254  execution_state_.reset(nullptr);
255  flib_def_.reset(nullptr);
256 }
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: TBBSession.h:311
::tensorflow::Status Close() override
Definition: TBBSession.cc:1074
CancellationManager * cancellation_manager_
Definition: TBBSession.h:295
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262

Member Function Documentation

::tensorflow::Status tensorflow::TBBSession::CheckNotClosed ( )
inlineprivate

Definition at line 243 of file TBBSession.h.

References checklumidiff::l, and btagGenBb_cfi::Status.

Referenced by DecorateAndPublishGraphForDebug(), and Extend().

243  {
244  mutex_lock l(closed_lock_);
245  if (closed_) return errors::Cancelled("Session has been closed.");
247  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
tensorflow::Status tensorflow::TBBSession::Close ( )
override

Definition at line 1074 of file TBBSession.cc.

References cancellation_manager_, closed_lock_, tensorflow::TBBSessionFactory::Deregister(), factory_, and checklumidiff::l.

Referenced by ~TBBSession().

1074  {
1075  cancellation_manager_->StartCancel();
1076  {
1077  mutex_lock l(closed_lock_);
1078  if (closed_) return ::tensorflow::Status::OK();
1079  closed_ = true;
1080  }
1081  if (factory_ != nullptr) factory_->Deregister(this);
1083 }
void Deregister(const TBBSession *session)
Definition: TBBSession.cc:157
CancellationManager * cancellation_manager_
Definition: TBBSession.h:295
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
TBBSessionFactory *const factory_
Definition: TBBSession.h:294
Status tensorflow::TBBSession::Create ( const GraphDef &  graph)
override

Definition at line 289 of file TBBSession.cc.

References ExtendLocked(), graph_def_lock_, init_error_, and checklumidiff::l.

289  {
290  TF_RETURN_IF_ERROR(init_error_);
291  if (graph.node_size() > 0) {
292  mutex_lock l(graph_def_lock_);
293  if (graph_created_) {
294  return errors::AlreadyExists(
295  "A Graph has already been created for this session.");
296  }
297  return ExtendLocked(graph);
298  }
299  return Status::OK();
300 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: TBBSession.cc:308
Status tensorflow::TBBSession::CreateDebuggerState ( const DebugOptions &  debug_options,
int64  session_run_index,
int64  executor_step_index,
const std::vector< string > &  input_names,
const std::vector< string > &  output_names,
const std::vector< string > &  target_names,
std::unique_ptr< DebuggerStateInterface > *  debugger_state 
)
private

Definition at line 332 of file TBBSession.cc.

Referenced by DecorateAndPublishGraphForDebug().

337  {
338  TF_RETURN_IF_ERROR(
339  DebuggerStateRegistry::CreateState(debug_options, debugger_state));
340  TF_RETURN_IF_ERROR(debugger_state->get()->PublishDebugMetadata(
341  debug_options.global_step(), session_run_index, executor_step_index,
342  input_names, output_names, target_names));
343  return Status::OK();
344 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::TBBSession::CreateGraphs ( const BuildGraphOptions &  options,
std::unordered_map< string, std::unique_ptr< Graph >> *  outputs,
std::unique_ptr< FunctionLibraryDefinition > *  flib_def,
RunStateArgs run_state_args,
DataTypeVector *  input_types,
DataTypeVector *  output_types 
)
private

Definition at line 902 of file TBBSession.cc.

References KineDebug3::count(), edmIntegrityCheck::d, device_mgr_, device_set_, devices_, edge_name_counter_, flib_def_, tensorflow::TBBSession::RunStateArgs::graph, graph_def_lock_, tensorflow::TBBSession::RunStateArgs::is_partial_run, checklumidiff::l, eostools::move(), dataset::name, options_, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, tablePrinter::prefix, alignCSCRings::s, btagGenBb_cfi::Status, and std::swap().

Referenced by GetOrCreateExecutors().

907  {
908  mutex_lock l(graph_def_lock_);
909  std::unique_ptr<ClientGraph> client_graph;
910 
911  std::unique_ptr<GraphExecutionState> temp_exec_state_holder;
912  GraphExecutionState* execution_state = nullptr;
913  if (options_.config.graph_options().place_pruned_graph()) {
914  // Because we are placing pruned graphs, we need to create a
915  // new GraphExecutionState for every new unseen graph,
916  // and then place it.
917  GraphExecutionStateOptions prune_options;
918  prune_options.device_set = &device_set_;
919  prune_options.session_options = &options_;
920  prune_options.stateful_placements = stateful_placements_;
921  TF_RETURN_IF_ERROR(GraphExecutionState::MakeForPrunedGraph(
922  execution_state_->original_graph_def().library(), prune_options,
923  execution_state_->original_graph_def(), subgraph_options,
924  &temp_exec_state_holder, &client_graph));
925  execution_state = temp_exec_state_holder.get();
926  } else {
927  execution_state = execution_state_.get();
928  TF_RETURN_IF_ERROR(
929  execution_state->BuildGraph(subgraph_options, &client_graph));
930  }
931 
932  if (subgraph_options.feed_endpoints.size() !=
933  client_graph->feed_types.size()) {
934  return errors::Internal(
935  "Graph pruning failed: requested number of feed endpoints = ",
936  subgraph_options.feed_endpoints.size(),
937  " versus number of pruned feed endpoints = ",
938  client_graph->feed_types.size());
939  }
940  if (subgraph_options.fetch_endpoints.size() !=
941  client_graph->fetch_types.size()) {
942  return errors::Internal(
943  "Graph pruning failed: requested number of fetch endpoints = ",
944  subgraph_options.fetch_endpoints.size(),
945  " versus number of pruned fetch endpoints = ",
946  client_graph->fetch_types.size());
947  }
948 
949  auto current_stateful_placements = execution_state->GetStatefulPlacements();
950  // Update our current state based on the execution_state's
951  // placements. If there are any mismatches for a node,
952  // we should fail, as this should never happen.
953  for (auto placement_pair : current_stateful_placements) {
954  const string& node_name = placement_pair.first;
955  const string& placement = placement_pair.second;
956  auto iter = stateful_placements_.find(node_name);
957  if (iter == stateful_placements_.end()) {
958  stateful_placements_.insert(std::make_pair(node_name, placement));
959  } else if (iter->second != placement) {
960  return errors::Internal(
961  "Stateful placement mismatch. "
962  "Current assignment of ",
963  node_name, " to ", iter->second, " does not match ", placement);
964  }
965  }
966 
967  stateful_placements_ = execution_state->GetStatefulPlacements();
968 
969  // Remember the graph in run state if this is a partial run.
970  if (run_state_args->is_partial_run) {
971  run_state_args->graph.reset(new Graph(flib_def_.get()));
972  CopyGraph(*execution_state->full_graph(), run_state_args->graph.get());
973  }
974 
975  // Partition the graph across devices.
976  PartitionOptions popts;
977  popts.node_to_loc = [](const Node* node) {
978  assert(node != nullptr);
979  return node->assigned_device_name();
980  };
981  popts.new_name = [this](const string& prefix) {
982  return strings::StrCat(prefix, "/_", edge_name_counter_.fetch_add(1));
983  };
984  popts.get_incarnation = [](const string& name) {
985  // The direct session does not have changing incarnation numbers.
986  // Just return '1'.
987  return 1;
988  };
989  popts.flib_def = &client_graph->graph.flib_def();
990  popts.control_flow_added = false;
991 
992  std::unordered_map<string, GraphDef> partitions;
993  TF_RETURN_IF_ERROR(Partition(popts, &client_graph->graph, &partitions));
994 
995  std::vector<string> device_names;
996  for (auto device : devices_) {
997  // Extract the LocalName from the device.
998  device_names.push_back(DeviceNameUtils::LocalName(device->name()));
999  }
1000 
1001  // Check for valid partitions.
1002  for (const auto& partition : partitions) {
1003  const string local_partition_name =
1004  DeviceNameUtils::LocalName(partition.first);
1005  if (std::count(device_names.begin(), device_names.end(),
1006  local_partition_name) == 0) {
1007  return errors::InvalidArgument(
1008  "Creating a partition for ", local_partition_name,
1009  " which doesn't exist in the list of available devices. Available "
1010  "devices: ",
1011  str_util::Join(device_names, ","));
1012  }
1013  }
1014 
1015  for (const auto& partition : partitions) {
1016  std::unique_ptr<Graph> device_graph(
1017  new Graph(client_graph->flib_def.get()));
1018  GraphConstructorOptions device_opts;
1019  // There are internal operations (e.g., send/recv) that we now allow.
1020  device_opts.allow_internal_ops = true;
1021  device_opts.expect_device_spec = true;
1022  TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(device_opts, partition.second,
1023  device_graph.get()));
1024  outputs->emplace(partition.first, std::move(device_graph));
1025  }
1026 
1027  GraphOptimizationPassOptions optimization_options;
1028  optimization_options.session_options = &options_;
1029  optimization_options.flib_def = client_graph->flib_def.get();
1030  optimization_options.partition_graphs = outputs;
1031  TF_RETURN_IF_ERROR(OptimizationPassRegistry::Global()->RunGrouping(
1032  OptimizationPassRegistry::POST_PARTITIONING, optimization_options));
1033 
1034  Status s;
1035  for (auto& partition : *outputs) {
1036  const string& partition_name = partition.first;
1037  std::unique_ptr<Graph>* graph = &partition.second;
1038 
1039  VLOG(2) << "Created " << DebugString(graph->get()) << " for "
1040  << partition_name;
1041 
1042  // Give the device an opportunity to rewrite its subgraph.
1043  Device* d;
1044  s = device_mgr_->LookupDevice(partition_name, &d);
1045  if (!s.ok()) break;
1046  s = d->MaybeRewriteGraph(graph);
1047  if (!s.ok()) {
1048  break;
1049  }
1050  }
1051  *flib_def = std::move(client_graph->flib_def);
1052  std::swap(*input_types, client_graph->feed_types);
1053  std::swap(*output_types, client_graph->fetch_types);
1054  return s;
1055 }
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: TBBSession.h:311
std::vector< Device * > devices_
Definition: TBBSession.h:263
std::atomic< int64 > edge_name_counter_
Definition: TBBSession.h:318
Partition
Definition: HLTHPDFilter.cc:32
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
const SessionOptions options_
Definition: TBBSession.h:259
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
def move(src, dest)
Definition: eostools.py:510
Status tensorflow::TBBSession::DecorateAndPublishGraphForDebug ( const DebugOptions &  debug_options,
Graph *  graph,
Device *  device 
)
private

Definition at line 346 of file TBBSession.cc.

References createfilelist::args, EnergyCorrector::c, cancellation_manager_, CheckNotClosed(), cost_model_manager_, CreateDebuggerState(), device_mgr_, devices_, executor_lock_, tensorflow::TBBSession::RunState::executors_done, tensorflow::TBBSession::PerPartitionExecutorsAndLib::flib, GetOrCreateExecutors(), tensorflow::TBBSession::PerPartitionExecutorsAndLib::graph, graph_def_lock_, mps_fire::i, tensorflow::TBBSession::ExecutorsAndKeys::input_name_to_index, tensorflow::TBBSession::ExecutorsAndKeys::input_types, PatBasicFWLiteJetAnalyzer_Selector_cfg::inputs, tensorflow::TBBSession::ExecutorsAndKeys::items, checklumidiff::l, eostools::move(), operation_timeout_in_ms_, options_, tensorflow::TBBSession::ExecutorsAndKeys::output_types, PatBasicFWLiteJetAnalyzer_Selector_cfg::outputs, tensorflow::TBBSession::RunState::rendez, ResourceHandleToInputTensor(), Run(), alignCSCRings::s, SchedClosure(), session_state_, btagGenBb_cfi::Status, tensorflow::TBBSession::ExecutorsAndKeys::step_count, step_id_counter_, sync_on_finish_, and WaitForNotification().

Referenced by GetOrCreateExecutors().

347  {
348  std::unique_ptr<DebugGraphDecoratorInterface> decorator;
349  TF_RETURN_IF_ERROR(
350  DebugGraphDecoratorRegistry::CreateDecorator(debug_options, &decorator));
351 
352  TF_RETURN_IF_ERROR(decorator->DecorateGraph(graph, device));
353  TF_RETURN_IF_ERROR(decorator->PublishGraph(*graph, device->name()));
354  return Status::OK();
355 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
void tensorflow::TBBSession::ExportCostModels ( CostModelManager::CostModelMap *  cost_models)
inline

Definition at line 123 of file TBBSession.h.

123  {
124  cost_model_manager_.ExportCostModels(cost_models);
125  }
CostModelManager cost_model_manager_
Definition: TBBSession.h:328
Status tensorflow::TBBSession::Extend ( const GraphDef &  graph)
override

Definition at line 302 of file TBBSession.cc.

References CheckNotClosed(), ExtendLocked(), graph_def_lock_, and checklumidiff::l.

302  {
303  TF_RETURN_IF_ERROR(CheckNotClosed());
304  mutex_lock l(graph_def_lock_);
305  return ExtendLocked(graph);
306 }
::tensorflow::Status CheckNotClosed()
Definition: TBBSession.h:243
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: TBBSession.cc:308
Status tensorflow::TBBSession::ExtendLocked ( const GraphDef &  graph)
private

Definition at line 308 of file TBBSession.cc.

References flib_def_, and MaybeInitializeExecutionState().

Referenced by Create(), and Extend().

308  {
309  bool already_initialized;
310  // If this is the first call, we can initialize the execution state
311  // with `graph` and do not need to call `Extend()`.
312  TF_RETURN_IF_ERROR(
313  MaybeInitializeExecutionState(graph, &already_initialized));
314  if (already_initialized) {
315  TF_RETURN_IF_ERROR(flib_def_->AddLibrary(graph.library()));
316  std::unique_ptr<GraphExecutionState> state;
317  TF_RETURN_IF_ERROR(execution_state_->Extend(graph, &state));
318  execution_state_.swap(state);
319  }
320  return Status::OK();
321 }
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: TBBSession.h:311
Status MaybeInitializeExecutionState(const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: TBBSession.cc:258
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
Status tensorflow::TBBSession::GetOrCreateExecutors ( gtl::ArraySlice< string >  inputs,
gtl::ArraySlice< string >  outputs,
gtl::ArraySlice< string >  target_nodes,
ExecutorsAndKeys **  executors_and_keys,
RunStateArgs run_state_args 
)
private

Definition at line 663 of file TBBSession.cc.

References CreateGraphs(), tensorflow::TBBSession::RunStateArgs::debug_options, DecorateAndPublishGraphForDebug(), device_mgr_, device_set_, executor_lock_, plotBeamSpotDB::first, tensorflow::TBBSession::RunStateArgs::graph, graph_def_lock_, cuy::graphs, tensorflow::TBBSession::RunStateArgs::handle, handle_name_counter_, mps_fire::i, triggerObjects_cff::id, input, tensorflow::TBBSession::RunStateArgs::is_partial_run, crabWrapper::key, checklumidiff::l, mps_check::lib, eostools::move(), gen::n, cscdqm::h::names, node_outputs_callback_, AlcaSiPixelAliHarvester0T_cff::options, options_, convertSQLitetoXML_cfg::output, and session_handle_.

Referenced by DecorateAndPublishGraphForDebug().

666  {
667  int64 handle_name_counter_value = -1;
668  if (LogMemory::IsEnabled() || run_state_args->is_partial_run) {
669  handle_name_counter_value = handle_name_counter_.fetch_add(1);
670  }
671 
672  string debug_tensor_watches_summary;
673  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
674  debug_tensor_watches_summary = SummarizeDebugTensorWatches(
675  run_state_args->debug_options.debug_tensor_watch_opts());
676  }
677 
678  // Fast lookup path, no sorting.
679  const string key = strings::StrCat(
680  str_util::Join(inputs, ","), "->", str_util::Join(outputs, ","), "/",
681  str_util::Join(target_nodes, ","), "/", run_state_args->is_partial_run,
682  "/", debug_tensor_watches_summary);
683  // Set the handle, if it's needed to log memory or for partial run.
684  if (handle_name_counter_value >= 0) {
685  run_state_args->handle =
686  strings::StrCat(key, ";", handle_name_counter_value);
687  }
688 
689  // See if we already have the executors for this run.
690  {
691  mutex_lock l(executor_lock_); // could use reader lock
692  auto it = executors_.find(key);
693  if (it != executors_.end()) {
694  *executors_and_keys = it->second.get();
695  return Status::OK();
696  }
697  }
698 
699  // Slow lookup path, the unsorted key missed the cache.
700  // Sort the inputs and outputs, and look up with the sorted key in case an
701  // earlier call used a different order of inputs and outputs.
702  //
703  // We could consider some other signature instead of sorting that
704  // preserves the same property to avoid the sort in the future.
705  std::vector<string> inputs_sorted(inputs.begin(), inputs.end());
706  std::sort(inputs_sorted.begin(), inputs_sorted.end());
707  std::vector<string> outputs_sorted(outputs.begin(), outputs.end());
708  std::sort(outputs_sorted.begin(), outputs_sorted.end());
709  std::vector<string> tn_sorted(target_nodes.begin(), target_nodes.end());
710  std::sort(tn_sorted.begin(), tn_sorted.end());
711 
712  const string sorted_key = strings::StrCat(
713  str_util::Join(inputs_sorted, ","), "->",
714  str_util::Join(outputs_sorted, ","), "/", str_util::Join(tn_sorted, ","),
715  "/", run_state_args->is_partial_run, "/", debug_tensor_watches_summary);
716  // Set the handle, if its needed to log memory or for partial run.
717  if (handle_name_counter_value >= 0) {
718  run_state_args->handle =
719  strings::StrCat(sorted_key, ";", handle_name_counter_value);
720  }
721 
722  // See if we already have the executors for this run.
723  {
724  mutex_lock l(executor_lock_);
725  auto it = executors_.find(sorted_key);
726  if (it != executors_.end()) {
727  *executors_and_keys = it->second.get();
728  // Insert this under the original key.
729  executors_.emplace(key, it->second);
730  return Status::OK();
731  }
732  }
733 
734  // Nothing found, so create the executors and store in the cache.
735  BuildGraphOptions options;
736  options.feed_endpoints = inputs_sorted;
737  options.fetch_endpoints = outputs_sorted;
738  options.target_nodes = tn_sorted;
739  options.use_function_convention = !run_state_args->is_partial_run;
740  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
741  options.debug_options = run_state_args->debug_options;
742  }
743 
744  std::shared_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys);
745 
746  // The executor_lock_ is intentionally released while executor is
747  // being created.
748  std::unordered_map<string, std::unique_ptr<Graph>> graphs;
749  TF_RETURN_IF_ERROR(CreateGraphs(options, &graphs, &ek->flib_def,
750  run_state_args, &ek->input_types,
751  &ek->output_types));
752 
753  if (run_state_args->is_partial_run) {
754  ek->graph = std::move(run_state_args->graph);
755  std::unordered_set<StringPiece, StringPieceHasher> names;
756  for (const string& input : inputs) {
757  TensorId id(ParseTensorName(input));
758  names.emplace(id.first);
759  }
760  for (const string& output : outputs) {
761  TensorId id(ParseTensorName(output));
762  names.emplace(id.first);
763  }
764  for (Node* n : ek->graph->nodes()) {
765  if (names.count(n->name()) > 0) {
766  ek->name_to_node.insert({n->name(), n});
767  }
768  }
769  }
770  ek->items.reserve(graphs.size());
771  const auto& optimizer_opts =
772  options_.config.graph_options().optimizer_options();
773 
774  int graph_def_version;
775  {
776  mutex_lock l(graph_def_lock_);
777  graph_def_version =
778  execution_state_->original_graph_def().versions().producer();
779  }
780  ek->proc_flr.reset(new ProcessFunctionLibraryRuntime(
781  device_mgr_.get(), options_.env, graph_def_version, ek->flib_def.get(),
782  optimizer_opts));
783 
784  GraphOptimizer optimizer(optimizer_opts);
785  for (auto iter = graphs.begin(); iter != graphs.end(); ++iter) {
786  const string& partition_name = iter->first;
787  std::unique_ptr<Graph>& partition_graph = iter->second;
788 
789  Device* device;
790  TF_RETURN_IF_ERROR(device_mgr_->LookupDevice(partition_name, &device));
791 
792  ek->items.resize(ek->items.size() + 1);
793  auto* item = &(ek->items.back());
794  auto lib = ek->proc_flr->GetFLR(partition_name);
795  if (lib == nullptr) {
796  return errors::Internal("Could not find device: ", partition_name);
797  }
798  item->flib = lib;
799 
800  LocalExecutorParams params;
801  params.device = device;
802  params.function_library = lib;
803  auto opseg = device->op_segment();
804  params.create_kernel = [this, lib, opseg](const NodeDef& ndef,
805  OpKernel** kernel) {
806  // We do not share the kernel via the OpSegment if the node is
807  // stateless, or a function.
808  // NOTE(mrry): We must not share function kernels (implemented
809  // using `CallOp`) between subgraphs, because `CallOp::handle_`
810  // is tied to a particular subgraph. Even if the function itself
811  // is stateful, the `CallOp` that invokes it is not.
812  if (!lib->IsStateful(ndef.op()) ||
813  lib->GetFunctionLibraryDefinition()->Find(ndef.op()) != nullptr) {
814  return lib->CreateKernel(ndef, kernel);
815  }
816  auto create_fn = [lib, &ndef](OpKernel** kernel) {
817  return lib->CreateKernel(ndef, kernel);
818  };
819  // Kernels created for subgraph nodes need to be cached. On
820  // cache miss, create_fn() is invoked to create a kernel based
821  // on the function library here + global op registry.
822  return opseg->FindOrCreate(session_handle_, ndef.name(), kernel,
823  create_fn);
824  };
825  params.delete_kernel = [lib](OpKernel* kernel) {
826  // If the node is stateful, opseg owns it. Otherwise, delete it.
827  if (kernel && !lib->IsStateful(kernel->type_string())) {
828  delete kernel;
829  }
830  };
831  params.node_outputs_cb = node_outputs_callback_;
832 
833  optimizer.Optimize(lib, options_.env, device, &iter->second,
834  /*shape_map=*/nullptr);
835 
836  // EXPERIMENTAL: tfdbg inserts debug nodes in the graph.
837  if (!options.debug_options.debug_tensor_watch_opts().empty()) {
838  TF_RETURN_IF_ERROR(DecorateAndPublishGraphForDebug(
839  options.debug_options, partition_graph.get(), params.device));
840  }
841 
842  TF_RETURN_IF_ERROR(EnsureMemoryTypes(DeviceType(device->device_type()),
843  device->name(),
844  partition_graph.get()));
845  // NewLocalExecutor takes ownership of partition_graph.
846  item->graph = partition_graph.get();
847  item->executor = nullptr;
848  item->device = device;
849  Executor* executor;
850  TF_RETURN_IF_ERROR(
851  NewLocalExecutor(params, partition_graph.release(), &executor));
852  item->executor.reset(executor);
853  }
854 
855  // Cache the mapping from input/output names to graph elements to
856  // avoid recomputing it every time.
857  if (!run_state_args->is_partial_run) {
858  // For regular `Run()`, we use the function calling convention, and so
859  // maintain a mapping from input/output names to
860  // argument/return-value ordinal index.
861  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
862  const string& input = inputs_sorted[i];
863  ek->input_name_to_index[input] = i;
864  }
865  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
866  const string& output = outputs_sorted[i];
867  ek->output_name_to_index[output] = i;
868  }
869  } else {
870  // For `PRun()`, we use the rendezvous calling convention, and so
871  // maintain a mapping from input/output names to rendezvous keys.
872  //
873  // We always use the first device as the device name portion of the
874  // key, even if we're feeding another graph.
875  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
876  const string& input = inputs_sorted[i];
877  ek->input_name_to_rendezvous_key[input] = GetRendezvousKey(
878  input, device_set_.client_device()->attributes(), FrameAndIter(0, 0));
879  }
880  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
881  const string& output = outputs_sorted[i];
882  ek->output_name_to_rendezvous_key[output] =
883  GetRendezvousKey(output, device_set_.client_device()->attributes(),
884  FrameAndIter(0, 0));
885  }
886  }
887 
888  // Reacquire the lock, try to insert into the map.
889  mutex_lock l(executor_lock_);
890 
891  // Another thread may have created the entry before us, in which case we will
892  // reuse the already created one.
893  auto insert_result = executors_.emplace(sorted_key, ek);
894  // Insert the value under the original key, so the fast path lookup will work
895  // if the user uses the same order of inputs, outputs, and targets again.
896  executors_.emplace(key, insert_result.first->second);
897  *executors_and_keys = insert_result.first->second.get();
898 
899  return Status::OK();
900 }
static const HistoName names[]
std::atomic< int64 > handle_name_counter_
Definition: TBBSession.h:319
::tensorflow::Status DecorateAndPublishGraphForDebug(const DebugOptions &debug_options, Graph *graph, Device *device)
Definition: TBBSession.cc:346
static std::string const input
Definition: EdmProvDump.cc:44
::tensorflow::Status CreateGraphs(const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
Definition: TBBSession.cc:902
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
const SessionOptions options_
Definition: TBBSession.h:259
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
Executor::Args::NodeOutputsCallback node_outputs_callback_
Definition: TBBSession.h:330
graphs
Definition: cuy.py:960
def move(src, dest)
Definition: eostools.py:510
bool graph_created_ tensorflow::TBBSession::GUARDED_BY ( graph_def_lock_  )
private
GraphDef graph_def_ tensorflow::TBBSession::GUARDED_BY ( graph_def_lock_  )
private
std::unordered_map<string, std::shared_ptr<ExecutorsAndKeys> > executors_ tensorflow::TBBSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, std::unique_ptr<RunState> > partial_runs_ tensorflow::TBBSession::GUARDED_BY ( executor_lock_  )
private
std::unordered_map<string, string> stateful_placements_ tensorflow::TBBSession::GUARDED_BY ( graph_def_lock_  )
private
std::unique_ptr<GraphExecutionState> execution_state_ tensorflow::TBBSession::GUARDED_BY ( graph_def_lock_  )
private
bool closed_ tensorflow::TBBSession::GUARDED_BY ( closed_lock_  )
private
tensorflow::Status tensorflow::TBBSession::ListDevices ( std::vector< DeviceAttributes > *  response)
override

Definition at line 1057 of file TBBSession.cc.

References edmIntegrityCheck::d, and devices_.

1058  {
1059  response->clear();
1060  response->reserve(devices_.size());
1061  for (Device* d : devices_) {
1062  const DeviceAttributes& attrs = d->attributes();
1063  response->emplace_back(attrs);
1064  }
1066 }
std::vector< Device * > devices_
Definition: TBBSession.h:263
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
::tensorflow::Status tensorflow::TBBSession::LocalDeviceManager ( const DeviceMgr **  output)
inlineoverride

Definition at line 118 of file TBBSession.h.

118  {
119  *output = device_mgr_.get();
121  }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
Status tensorflow::TBBSession::MaybeInitializeExecutionState ( const GraphDef &  graph,
bool *  out_already_initialized 
)
private

Definition at line 258 of file TBBSession.cc.

References device_set_, flib_def_, AlcaSiPixelAliHarvester0T_cff::options, options_, and groupFilesInBlocks::temp.

Referenced by ExtendLocked().

259  {
260  // If already initialized, do nothing.
261  if (flib_def_ && execution_state_) {
262  *out_already_initialized = true;
263  return Status::OK();
264  }
265  // Set up the per-session execution state.
266  // NOTE(mrry): The function library created here will be used for
267  // all subsequent extensions of the graph.
268  flib_def_.reset(
269  new FunctionLibraryDefinition(OpRegistry::Global(), graph.library()));
270  GraphExecutionStateOptions options;
271  options.device_set = &device_set_;
272  options.session_options = &options_;
273  // TODO(mrry,suharshs): We explicitly copy `graph` so that
274  // `MakeForBaseGraph()` can take ownership of its
275  // contents. Previously this happened implicitly in calls to the
276  // `GraphExecutionState`. Other sessions call
277  // `MakeForBaseGraph` in such a way that we can destructively read
278  // the passed-in `GraphDef`. In principle we could do the same here,
279  // with a wider refactoring; we might revise the direct session so
280  // that it copies the graph fewer times.
281  GraphDef temp(graph);
282  TF_RETURN_IF_ERROR(
283  GraphExecutionState::MakeForBaseGraph(&temp, options, &execution_state_));
284  graph_created_ = true;
285  *out_already_initialized = false;
286  return Status::OK();
287 }
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: TBBSession.h:311
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
const SessionOptions options_
Definition: TBBSession.h:259
tensorflow::Status tensorflow::TBBSession::Reset ( const std::vector< string > &  containers)

Definition at line 1068 of file TBBSession.cc.

References device_mgr_.

1069  {
1070  device_mgr_->ClearContainers(containers);
1072 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
Status tensorflow::TBBSession::ResourceHandleToInputTensor ( const Tensor &  resource_tensor,
Tensor *  retrieved_tensor 
)
private

Definition at line 637 of file TBBSession.cc.

References session_state_.

Referenced by DecorateAndPublishGraphForDebug().

638  {
639  if (resource_tensor.dtype() != DT_RESOURCE) {
640  return errors::InvalidArgument(strings::StrCat(
641  "ResourceHandleToInputTensor() received non-DT_RESOURCE Tensor: ",
642  resource_tensor.dtype()));
643  }
644 
645  const ResourceHandle& resource_handle =
646  resource_tensor.scalar<ResourceHandle>()();
647 
648  if (resource_handle.container() ==
649  SessionState::kTensorHandleResourceTypeName) {
650  return session_state_.GetTensor(resource_handle.name(), retrieved_tensor);
651  } else {
652  return errors::InvalidArgument(strings::StrCat(
653  "Invalid resource type hash code: ", resource_handle.hash_code(),
654  "(name: ", resource_handle.name(),
655  " type: ", resource_handle.maybe_type_name(),
656  "). Perhaps a resource tensor was being provided as a feed? That is "
657  "not currently allowed. Please file an issue at "
658  "https://github.com/tensorflow/tensorflow/issues/new, ideally with a "
659  "short code snippet that leads to this error message."));
660  }
661 }
SessionState session_state_
Definition: TBBSession.h:292
Status tensorflow::TBBSession::Run ( const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs 
)
override

Definition at line 323 of file TBBSession.cc.

Referenced by DecorateAndPublishGraphForDebug().

326  {
327  RunMetadata run_metadata;
328  return Run(RunOptions(), inputs, output_names, target_nodes, outputs,
329  &run_metadata);
330 }
::tensorflow::Status Run(const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
Definition: TBBSession.cc:323
::tensorflow::Status tensorflow::TBBSession::Run ( const ::tensorflow::RunOptions &  run_options,
const NamedTensorList inputs,
const std::vector< string > &  output_names,
const std::vector< string > &  target_nodes,
std::vector< Tensor > *  outputs,
RunMetadata *  run_metadata 
)
override
void tensorflow::TBBSession::SchedClosure ( tbb::task_arena &  arena,
tbb::task_group &  g,
std::function< void()>  c 
)
private

Definition at line 194 of file TBBSession.cc.

References EnergyCorrector::c.

Referenced by DecorateAndPublishGraphForDebug().

194  {
195  arena.execute( [&g, &c] () {g.run( c ); } );
196 }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
tensorflow::TBBSession::TF_DISALLOW_COPY_AND_ASSIGN ( TBBSession  )
private
tensorflow::Status tensorflow::TBBSession::WaitForNotification ( Notification *  n,
int64  timeout_in_ms 
)
private

Definition at line 1150 of file TBBSession.cc.

References btagGenBb_cfi::Status.

Referenced by DecorateAndPublishGraphForDebug(), and WaitForNotification().

1151  {
1152  if (timeout_in_ms > 0) {
1153  const int64 timeout_in_us = timeout_in_ms * 1000;
1154  const bool notified =
1155  WaitForNotificationWithTimeout(notification, timeout_in_us);
1156  if (!notified) {
1157  return Status(error::DEADLINE_EXCEEDED,
1158  "Timed out waiting for notification");
1159  }
1160  } else {
1161  notification->WaitForNotification();
1162  }
1163  return Status::OK();
1164 }
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
void tensorflow::TBBSession::WaitForNotification ( tbb::task_arena &  arena,
tbb::task_group &  group,
RunState run_state,
CancellationManager *  cm,
int64  timeout_in_ms 
)
private

Definition at line 1129 of file TBBSession.cc.

References tensorflow::TBBSession::RunState::executors_done, checklumidiff::l, tensorflow::TBBSession::RunState::mu_, btagGenBb_cfi::Status, mps_update::status, and WaitForNotification().

1130  {
1131  // Doing the wait in the arena adds this thread to the arena
1132  // and therefore tasks associated to the group can run on this thread
1133  arena.execute([&taskGroup]() { taskGroup.wait();} );
1134 
1135  const Status status =
1136  WaitForNotification(&run_state->executors_done, timeout_in_ms);
1137  if (!status.ok()) {
1138  {
1139  mutex_lock l(run_state->mu_);
1140  run_state->status.Update(status);
1141  }
1142  cm->StartCancel();
1143  // We must wait for the executors to complete, because they have borrowed
1144  // references to `cm` and other per-step state. After this notification, it
1145  // is safe to clean up the step.
1146  run_state->executors_done.WaitForNotification();
1147  }
1148 }
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: TBBSession.cc:1150

Friends And Related Function Documentation

friend class DebugGateway
friend

Definition at line 335 of file TBBSession.h.

Member Data Documentation

CancellationManager* tensorflow::TBBSession::cancellation_manager_
private

Definition at line 295 of file TBBSession.h.

Referenced by Close(), DecorateAndPublishGraphForDebug(), and ~TBBSession().

mutex tensorflow::TBBSession::closed_lock_
private

Definition at line 314 of file TBBSession.h.

Referenced by Close().

CostModelManager tensorflow::TBBSession::cost_model_manager_
private

Definition at line 328 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug().

const std::unique_ptr<const DeviceMgr> tensorflow::TBBSession::device_mgr_
private
DeviceSet tensorflow::TBBSession::device_set_
private
std::vector<Device*> tensorflow::TBBSession::devices_
private
std::atomic<int64> tensorflow::TBBSession::edge_name_counter_ = {0}
private

Definition at line 318 of file TBBSession.h.

Referenced by CreateGraphs().

mutex tensorflow::TBBSession::executor_lock_
private

Definition at line 278 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and GetOrCreateExecutors().

TBBSessionFactory* const tensorflow::TBBSession::factory_
private

Definition at line 294 of file TBBSession.h.

Referenced by Close().

std::unique_ptr<FunctionLibraryDefinition> tensorflow::TBBSession::flib_def_
private
mutex tensorflow::TBBSession::graph_def_lock_
private
std::atomic<int64> tensorflow::TBBSession::handle_name_counter_ = {0}
private

Definition at line 319 of file TBBSession.h.

Referenced by GetOrCreateExecutors().

Status tensorflow::TBBSession::init_error_
private

Definition at line 272 of file TBBSession.h.

Referenced by Create().

Executor::Args::NodeOutputsCallback tensorflow::TBBSession::node_outputs_callback_ = nullptr
private

Definition at line 330 of file TBBSession.h.

Referenced by GetOrCreateExecutors().

const int64 tensorflow::TBBSession::operation_timeout_in_ms_ = 0
private

Definition at line 325 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug().

const SessionOptions tensorflow::TBBSession::options_
private
string tensorflow::TBBSession::session_handle_
private

Definition at line 266 of file TBBSession.h.

Referenced by GetOrCreateExecutors(), TBBSession(), and ~TBBSession().

SessionState tensorflow::TBBSession::session_state_
private

Definition at line 292 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and ResourceHandleToInputTensor().

std::atomic_int_fast64_t tensorflow::TBBSession::step_id_counter_
staticprivate

Definition at line 322 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug().

bool tensorflow::TBBSession::sync_on_finish_ = true
private

Definition at line 275 of file TBBSession.h.

Referenced by DecorateAndPublishGraphForDebug(), and TBBSession().