CMS 3D CMS Logo

TBBSession.cc
Go to the documentation of this file.
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 //NOTE: The memory layout of the Node class changes depending on if NDEBUG was
16 // set when tensorflow was compiled. The reason is Node class holds two edgeset
17 // class instances and edgeset adds a member data if NDEBUG is set
18 
19 /*
20 This file is an adaptation of the original direct_session.cc file located at
21 https://github.com/tensorflow/tensorflow/blob/v1.5.0/tensorflow/core/common_runtime/direct_session.cc
22 to meet the demands of the software environment developed and used by the CMS collaboration.
23 
24 Changes with respect to the original code are documented in the TBBSession.h header file.
25 */
26 
27 #if !defined(NDEBUG)
28 #define NDEBUG 1
29 #endif
30 
31 #include "TBBSession.h"
32 
33 #include <atomic>
34 #include <string>
35 #include <vector>
36 
37 #include "tbb/task_group.h"
40 
41 #include "tensorflow/core/common_runtime/constant_folding.h"
42 #include "tensorflow/core/common_runtime/debugger_state_interface.h"
43 #include "tensorflow/core/common_runtime/device_factory.h"
44 #include "tensorflow/core/common_runtime/executor.h"
45 #include "tensorflow/core/common_runtime/function.h"
46 #include "tensorflow/core/common_runtime/graph_optimizer.h"
47 #include "tensorflow/core/common_runtime/memory_types.h"
48 #include "tensorflow/core/common_runtime/optimization_registry.h"
49 #include "tensorflow/core/common_runtime/step_stats_collector.h"
50 #include "tensorflow/core/framework/function.h"
51 #include "tensorflow/core/framework/graph.pb_text.h"
52 #include "tensorflow/core/framework/graph.pb.h"
53 #include "tensorflow/core/framework/graph_def_util.h"
54 #include "tensorflow/core/framework/log_memory.h"
55 #include "tensorflow/core/framework/node_def.pb.h"
56 #include "tensorflow/core/framework/tensor.h"
57 #include "tensorflow/core/framework/versions.pb.h"
58 #include "tensorflow/core/graph/algorithm.h"
59 #include "tensorflow/core/graph/graph.h"
60 #include "tensorflow/core/graph/graph_constructor.h"
61 #include "tensorflow/core/graph/graph_partition.h"
62 #include "tensorflow/core/graph/subgraph.h"
63 #include "tensorflow/core/graph/tensor_id.h"
64 #include "tensorflow/core/lib/core/errors.h"
65 #include "tensorflow/core/lib/core/notification.h"
66 #include "tensorflow/core/lib/core/refcount.h"
67 #include "tensorflow/core/lib/core/status.h"
68 #include "tensorflow/core/lib/gtl/array_slice.h"
69 #include "tensorflow/core/lib/gtl/stl_util.h"
70 #include "tensorflow/core/lib/monitoring/counter.h"
71 #include "tensorflow/core/lib/strings/numbers.h"
72 #include "tensorflow/core/lib/strings/str_util.h"
73 #include "tensorflow/core/lib/strings/strcat.h"
74 #include "tensorflow/core/platform/cpu_info.h"
75 #include "tensorflow/core/platform/device_tracer.h"
76 #include "tensorflow/core/platform/logging.h"
77 #include "tensorflow/core/platform/mutex.h"
78 #include "tensorflow/core/platform/types.h"
79 #include "tensorflow/core/util/device_name_utils.h"
80 #include "tensorflow/core/util/env_var.h"
81 
82 
83 namespace tensorflow {
84 
85 namespace {
86 
87 CMS_THREAD_SAFE auto* tbb_session_runs = monitoring::Counter<0>::New(
88  "/tensorflow/core/tbb_session_runs",
89  "The number of times TBBSession::Run() has been called.");
90 
91 // TODO(vrv): Figure out how to unify the many different functions
92 // that generate RendezvousKey, since many of them have to be
93 // consistent with each other.
94 string GetRendezvousKey(const string& tensor_name,
95  const DeviceAttributes& device_info,
96  const FrameAndIter& frame_iter) {
97  return strings::StrCat(device_info.name(), ";",
98  strings::FpToString(device_info.incarnation()), ";",
99  device_info.name(), ";", tensor_name, ";",
100  frame_iter.frame_id, ":", frame_iter.iter_id);
101 }
102 
103 } // namespace
104 
105 class TBBSessionFactory : public SessionFactory {
106  public:
108 
109  bool AcceptsOptions(const SessionOptions& options) override {
110  return options.target == "tbb";
111  }
112 
113  Session* NewSession(const SessionOptions& options) override {
114  // Must do this before the CPU allocator is created.
115  if (options.config.graph_options().build_cost_model() > 0) {
116  EnableCPUAllocatorFullStats(true);
117  }
118  std::vector<Device*> devices;
119  const Status s = DeviceFactory::AddDevices(
120  options, "/job:localhost/replica:0/task:0", &devices);
121  if (!s.ok()) {
122  LOG(ERROR) << s;
123  return nullptr;
124  }
125 
127  new TBBSession(options, new DeviceMgr(devices), this);
128  {
129  mutex_lock l(sessions_lock_);
130  sessions_.push_back(session);
131  }
132  return session;
133  }
134 
135  Status Reset(const SessionOptions& options,
136  const std::vector<string>& containers) override {
137  std::vector<TBBSession*> sessions_to_reset;
138  {
139  mutex_lock l(sessions_lock_);
140  // We create a copy to ensure that we don't have a deadlock when
141  // session->Close calls the TBBSessionFactory.Deregister, which
142  // acquires sessions_lock_.
143  std::swap(sessions_to_reset, sessions_);
144  }
145  Status s;
146  for (auto session : sessions_to_reset) {
147  s.Update(session->Reset(containers));
148  }
149  // TODO(suharshs): Change the Reset behavior of all SessionFactories so that
150  // it doesn't close the sessions?
151  for (auto session : sessions_to_reset) {
152  s.Update(session->Close());
153  }
154  return s;
155  }
156 
158  mutex_lock l(sessions_lock_);
159  sessions_.erase(std::remove(sessions_.begin(), sessions_.end(), session),
160  sessions_.end());
161  }
162 
163  private:
165  std::vector<TBBSession*> sessions_ GUARDED_BY(sessions_lock_);
166 };
167 
169  public:
171  SessionFactory::Register("TBB_SESSION", new TBBSessionFactory());
172  }
173 };
175 
176 std::atomic_int_fast64_t TBBSession::step_id_counter_(1);
177 
178 // NOTE: On Android with a single device, there is never
179 // a risk of an OpKernel blocking indefinitely:
180 //
181 // 1) No operations do I/O that depends on other simultaneous kernels,
182 //
183 // 2) Recv nodes always complete immediately: The inputs are sent into
184 // the local rendezvous before we start the executor, so the
185 // corresponding recvs will not block.
186 //
187 // Based on these assumptions, we can use the same thread pool for
188 // both "non-blocking" and "blocking" OpKernels on Android.
189 //
190 // This may change down the road when we add support for multiple
191 // devices that run concurrently, in which case we will need to
192 // revisit this decision.
193 // Override to allow CMSSW FWK to schedule
194 void TBBSession::SchedClosure(tbb::task_arena& arena, tbb::task_group& g, std::function<void()> c) {
195  arena.execute( [&g, &c] () {g.run( c ); } );
196 }
197 
198 TBBSession::TBBSession(const SessionOptions& options,
199  const DeviceMgr* device_mgr,
200  TBBSessionFactory* const factory)
201  : options_(options),
202  device_mgr_(device_mgr),
203  factory_(factory),
204  cancellation_manager_(new CancellationManager()),
205  operation_timeout_in_ms_(options_.config.operation_timeout_in_ms()) {
206  // The default value of sync_on_finish will be flipped soon and this
207  // environment variable will be removed as well.
208  const Status status =
209  ReadBoolFromEnvVar("TF_SYNC_ON_FINISH", true, &sync_on_finish_);
210  if (!status.ok()) {
211  LOG(ERROR) << status.error_message();
212  }
213  // NOTE(mrry): We do not need to use a unique string for the session
214  // handle, because TBBSession owns its devices. This may change
215  // in future versions.
216  session_handle_ = "tbb";
217  int devices_added = 0;
218  if (options.config.log_device_placement()) {
219  const string mapping_str = device_mgr_->DeviceMappingString();
220  if (mapping_str.empty()) {
221  printf("Device mapping: no known devices.\n");
222  } else {
223  printf("Device mapping:\n%s", mapping_str.c_str());
224  }
225  LOG(INFO) << "Device mapping:\n" << mapping_str;
226  }
227  for (auto d : device_mgr_->ListDevices()) {
228  devices_.push_back(d);
229  device_set_.AddDevice(d);
230  d->op_segment()->AddHold(session_handle_);
231 
232  // The first device added is special: it is the 'client device' (a
233  // CPU device) from which we feed and fetch Tensors.
234  if (devices_added == 0) {
235  device_set_.set_client_device(d);
236  }
237  ++devices_added;
238  }
239 }
240 
242  if (!closed_) Close().IgnoreError();
243  for (auto& it : partial_runs_) {
244  it.second.reset(nullptr);
245  }
246  for (auto& it : executors_) {
247  it.second.reset();
248  }
249  for (auto d : device_mgr_->ListDevices()) {
250  d->op_segment()->RemoveHold(session_handle_);
251  }
252  delete cancellation_manager_;
253 
254  execution_state_.reset(nullptr);
255  flib_def_.reset(nullptr);
256 }
257 
259  const GraphDef& graph, bool* out_already_initialized) {
260  // If already initialized, do nothing.
261  if (flib_def_ && execution_state_) {
262  *out_already_initialized = true;
263  return Status::OK();
264  }
265  // Set up the per-session execution state.
266  // NOTE(mrry): The function library created here will be used for
267  // all subsequent extensions of the graph.
268  flib_def_.reset(
269  new FunctionLibraryDefinition(OpRegistry::Global(), graph.library()));
270  GraphExecutionStateOptions options;
271  options.device_set = &device_set_;
272  options.session_options = &options_;
273  // TODO(mrry,suharshs): We explicitly copy `graph` so that
274  // `MakeForBaseGraph()` can take ownership of its
275  // contents. Previously this happened implicitly in calls to the
276  // `GraphExecutionState`. Other sessions call
277  // `MakeForBaseGraph` in such a way that we can destructively read
278  // the passed-in `GraphDef`. In principle we could do the same here,
279  // with a wider refactoring; we might revise the direct session so
280  // that it copies the graph fewer times.
281  GraphDef temp(graph);
282  TF_RETURN_IF_ERROR(
283  GraphExecutionState::MakeForBaseGraph(&temp, options, &execution_state_));
284  graph_created_ = true;
285  *out_already_initialized = false;
286  return Status::OK();
287 }
288 
289 Status TBBSession::Create(const GraphDef& graph) {
290  TF_RETURN_IF_ERROR(init_error_);
291  if (graph.node_size() > 0) {
292  mutex_lock l(graph_def_lock_);
293  if (graph_created_) {
294  return errors::AlreadyExists(
295  "A Graph has already been created for this session.");
296  }
297  return ExtendLocked(graph);
298  }
299  return Status::OK();
300 }
301 
302 Status TBBSession::Extend(const GraphDef& graph) {
303  TF_RETURN_IF_ERROR(CheckNotClosed());
304  mutex_lock l(graph_def_lock_);
305  return ExtendLocked(graph);
306 }
307 
308 Status TBBSession::ExtendLocked(const GraphDef& graph) {
309  bool already_initialized;
310  // If this is the first call, we can initialize the execution state
311  // with `graph` and do not need to call `Extend()`.
312  TF_RETURN_IF_ERROR(
313  MaybeInitializeExecutionState(graph, &already_initialized));
314  if (already_initialized) {
315  TF_RETURN_IF_ERROR(flib_def_->AddLibrary(graph.library()));
316  std::unique_ptr<GraphExecutionState> state;
317  TF_RETURN_IF_ERROR(execution_state_->Extend(graph, &state));
318  execution_state_.swap(state);
319  }
320  return Status::OK();
321 }
322 
324  const std::vector<string>& output_names,
325  const std::vector<string>& target_nodes,
326  std::vector<Tensor>* outputs) {
327  RunMetadata run_metadata;
328  return Run(RunOptions(), inputs, output_names, target_nodes, outputs,
329  &run_metadata);
330 }
331 
333  const DebugOptions& debug_options, int64 session_run_index,
334  int64 executor_step_index, const std::vector<string>& input_names,
335  const std::vector<string>& output_names,
336  const std::vector<string>& target_names,
337  std::unique_ptr<DebuggerStateInterface>* debugger_state) {
338  TF_RETURN_IF_ERROR(
339  DebuggerStateRegistry::CreateState(debug_options, debugger_state));
340  TF_RETURN_IF_ERROR(debugger_state->get()->PublishDebugMetadata(
341  debug_options.global_step(), session_run_index, executor_step_index,
342  input_names, output_names, target_names));
343  return Status::OK();
344 }
345 
347  const DebugOptions& debug_options, Graph* graph, Device* device) {
348  std::unique_ptr<DebugGraphDecoratorInterface> decorator;
349  TF_RETURN_IF_ERROR(
350  DebugGraphDecoratorRegistry::CreateDecorator(debug_options, &decorator));
351 
352  TF_RETURN_IF_ERROR(decorator->DecorateGraph(graph, device));
353  TF_RETURN_IF_ERROR(decorator->PublishGraph(*graph, device->name()));
354  return Status::OK();
355 }
356 
357 Status TBBSession::Run(const RunOptions& run_options,
358  const NamedTensorList& inputs,
359  const std::vector<string>& output_names,
360  const std::vector<string>& target_nodes,
361  std::vector<Tensor>* outputs,
362  RunMetadata* run_metadata) {
363  TF_RETURN_IF_ERROR(CheckNotClosed());
364  tbb_session_runs->GetCell()->IncrementBy(1);
365  {
366  mutex_lock l(graph_def_lock_);
367  if (!graph_created_) {
368  return errors::InvalidArgument(
369  "Session was not created with a graph before Run()!");
370  }
371  }
372 
373  // Extract the inputs names for this run of the session.
374  std::vector<string> input_tensor_names;
375  input_tensor_names.reserve(inputs.size());
376  for (const auto& it : inputs) {
377  input_tensor_names.push_back(it.first);
378  }
379 
380  // Check if we already have an executor for these arguments.
381  ExecutorsAndKeys* executors_and_keys;
382  RunStateArgs run_state_args(run_options.debug_options());
383 
384  Executor::Args args;
385  args.step_id = step_id_counter_.fetch_add(1);
386 
387  TF_RETURN_IF_ERROR(
388  GetOrCreateExecutors(input_tensor_names, output_names, target_nodes,
389  &executors_and_keys, &run_state_args));
390  const int64 executor_step_count = executors_and_keys->step_count.fetch_add(1);
391 
392  std::unique_ptr<DebuggerStateInterface> debugger_state;
393  if (!run_options.debug_options().debug_tensor_watch_opts().empty()) {
394  TF_RETURN_IF_ERROR(CreateDebuggerState(
395  run_options.debug_options(), args.step_id, executor_step_count,
396  input_tensor_names, output_names, target_nodes, &debugger_state));
397  }
398 
399  // Configure a call frame for the step, which we use to feed and
400  // fetch values to and from the executors.
401  FunctionCallFrame call_frame(executors_and_keys->input_types,
402  executors_and_keys->output_types);
403  gtl::InlinedVector<Tensor, 4> feed_args(inputs.size());
404  for (const auto& it : inputs) {
405  if (it.second.dtype() == DT_RESOURCE) {
406  Tensor tensor_from_handle;
407  TF_RETURN_IF_ERROR(
408  ResourceHandleToInputTensor(it.second, &tensor_from_handle));
409  feed_args[executors_and_keys->input_name_to_index[it.first]] =
410  tensor_from_handle;
411  } else {
412  feed_args[executors_and_keys->input_name_to_index[it.first]] = it.second;
413  }
414  }
415  const Status s = call_frame.SetArgs(feed_args);
416  if (errors::IsInternal(s)) {
417  return errors::InvalidArgument(s.error_message());
418  } else if (!s.ok()) {
419  return s;
420  }
421 
422  // Create a run state and start execution.
423  RunState run_state(args.step_id, &devices_);
424  run_state.rendez = new IntraProcessRendezvous(device_mgr_.get());
425  CancellationManager step_cancellation_manager;
426  args.call_frame = &call_frame;
427 
428  // Use a task_arena to avoid having unrelated tasks start
429  // running on this thread (which could start deadlocks)
430  tbb::task_arena taskArena;
431  tbb::task_group taskGroup;
432  // we are required to always call wait before destructor
433  auto doneWithTaskGroup = [&taskArena, &taskGroup](void *) { taskArena.execute([&taskGroup]() { taskGroup.wait();}); };
434  std::unique_ptr<tbb::task_group, decltype(doneWithTaskGroup) > guard(&taskGroup, doneWithTaskGroup);
435 
436  // Start parallel Executors.
437  const size_t num_executors = executors_and_keys->items.size();
438  ExecutorBarrier* barrier = new ExecutorBarrier(
439  num_executors, run_state.rendez, [&run_state](const Status& ret) {
440  {
441  mutex_lock l(run_state.mu_);
442  run_state.status.Update(ret);
443  }
444  run_state.executors_done.Notify();
445  });
446 
447  args.rendezvous = run_state.rendez;
448  args.cancellation_manager = &step_cancellation_manager;
449 
450  args.session_state = &session_state_;
451  args.tensor_store = &run_state.tensor_store;
452  args.step_container = &run_state.step_container;
453  if (LogMemory::IsEnabled()) {
454  LogMemory::RecordStep(args.step_id, run_state_args.handle);
455  }
456  args.sync_on_finish = sync_on_finish_;
457 
458  const bool do_trace = (run_options.trace_level() > RunOptions::NO_TRACE);
459 
460  bool update_cost_model = false;
461  if (options_.config.graph_options().build_cost_model() > 0) {
462  const int64 build_cost_model_every =
463  options_.config.graph_options().build_cost_model();
464  const int64 build_cost_model_after =
465  options_.config.graph_options().build_cost_model_after();
466  int64 measure_step_count = executor_step_count - build_cost_model_after;
467  if (measure_step_count >= 0) {
468  update_cost_model =
469  ((measure_step_count + 1) % build_cost_model_every == 0);
470  }
471  }
472  if (do_trace || update_cost_model ||
473  run_options.report_tensor_allocations_upon_oom()) {
474  run_state.collector.reset(
475  new StepStatsCollector(run_metadata->mutable_step_stats()));
476  args.stats_collector = run_state.collector.get();
477  }
478 
479  std::unique_ptr<DeviceTracer> tracer;
480  if (run_options.trace_level() >= RunOptions::HARDWARE_TRACE) {
481  tracer = CreateDeviceTracer();
482  // tracer may be NULL on platforms without accelerators.
483  if (tracer) {
484  Status s = tracer->Start();
485  if (!s.ok()) {
486  run_state.executors_done.Notify();
487  delete barrier;
488  return s;
489  }
490  }
491  }
492 
493  // Register this step with session's cancellation manager, so that
494  // `Session::Close()` will cancel the step.
495  const CancellationToken cancellation_token =
496  cancellation_manager_->get_cancellation_token();
497  const bool already_cancelled = !cancellation_manager_->RegisterCallback(
498  cancellation_token, [&step_cancellation_manager]() {
499  step_cancellation_manager.StartCancel();
500  });
501  if (already_cancelled) {
502  // NOTE(mrry): If we don't explicitly notify
503  // `run_state.executors_done`, the RunState destructor would
504  // block on this notification.
505  run_state.executors_done.Notify();
506  delete barrier;
507  return errors::Cancelled("Run call was cancelled");
508  }
509 
510  // pass taskArena and taskGroup to SchedClosure
511  // consequently, disable TF's own thread logic inside the loop
512  Executor::Args::Runner default_runner = [this, &taskArena, &taskGroup](Executor::Args::Closure c) {
513  SchedClosure(taskArena, taskGroup, std::move(c));
514  };
515  for (const auto& item : executors_and_keys->items) {
516  // TODO(zhengxq): support partial run.
517  // TODO(zhengxq): if the device picks its own threadpool, we need to assign
518  // less threads to the main compute pool by default.
519  // thread::ThreadPool* device_thread_pool =
520  // item.device->tensorflow_device_thread_pool();
521  // if (!device_thread_pool) {
522  // args.runner = default_runner;
523  // } else {
524  // args.runner = [this, device_thread_pool](Executor::Args::Closure c) {
525  // SchedClosure(device_thread_pool, std::move(c));
526  // };
527  // }
528  args.runner = default_runner;
529  item.executor->RunAsync(args, barrier->Get());
530  }
531 
532  // WaitForNotification will handle calling wait on taskGroup
533  guard.release();
534  WaitForNotification(taskArena, taskGroup, &run_state, &step_cancellation_manager,
535  run_options.timeout_in_ms() > 0
536  ? run_options.timeout_in_ms()
538 
539  if (!cancellation_manager_->DeregisterCallback(cancellation_token)) {
540  // The step has been cancelled: make sure we don't attempt to receive the
541  // outputs as this would make it block forever.
542  mutex_lock l(run_state.mu_);
543  run_state.status.Update(errors::Cancelled("Run call was cancelled"));
544  }
545 
546  if (tracer) {
547  TF_RETURN_IF_ERROR(tracer->Stop());
548  TF_RETURN_IF_ERROR(tracer->Collect(args.stats_collector));
549  }
550 
551  {
552  mutex_lock l(run_state.mu_);
553  TF_RETURN_IF_ERROR(run_state.status);
554  }
555 
556  // Receive outputs.
557  if (outputs) {
558  std::vector<Tensor> sorted_outputs;
559  const Status s = call_frame.ConsumeRetvals(&sorted_outputs);
560  if (errors::IsInternal(s)) {
561  return errors::InvalidArgument(s.error_message());
562  } else if (!s.ok()) {
563  return s;
564  }
565  const bool unique_outputs =
566  output_names.size() == executors_and_keys->output_name_to_index.size();
567  // first_indices[i] = j implies that j is the smallest value for which
568  // output_names[i] == output_names[j].
569  std::vector<int> first_indices;
570  if (!unique_outputs) {
571  first_indices.resize(output_names.size());
572  for (int i = 0; i < static_cast<int>(output_names.size()); ++i) {
573  for (int j = 0; j <= i; ++j) {
574  if (output_names[i] == output_names[j]) {
575  first_indices[i] = j;
576  break;
577  }
578  }
579  }
580  }
581  outputs->clear();
582  outputs->reserve(sorted_outputs.size());
583  for (int i = 0; i < static_cast<int>(output_names.size()); ++i) {
584  const string& output_name = output_names[i];
585  if (first_indices.empty() || first_indices[i] == i) {
586  outputs->emplace_back(
587  std::move(sorted_outputs[executors_and_keys
588  ->output_name_to_index[output_name]]));
589  } else {
590  outputs->push_back((*outputs)[first_indices[i]]);
591  }
592  }
593  }
594 
595  // Save the output tensors of this run we choose to keep.
596  TF_RETURN_IF_ERROR(
597  run_state.tensor_store.SaveTensors(output_names, &session_state_));
598  if (args.stats_collector) {
599  args.stats_collector->Finalize();
600  }
601 
602  // Build and return the cost model as instructed.
603  mutex_lock l(executor_lock_);
604  if (update_cost_model) {
605  // Build the cost model
606  std::unordered_map<string, const Graph*> device_to_graph;
607  for (const PerPartitionExecutorsAndLib& partition :
608  executors_and_keys->items) {
609  const Graph* graph = partition.graph;
610  const string device = partition.flib->device()->name();
611  device_to_graph[device] = graph;
612  }
613  args.stats_collector->BuildCostModel(&cost_model_manager_, device_to_graph);
614 
615  // annotate stats onto cost graph.
616  CostGraphDef* cost_graph = run_metadata->mutable_cost_graph();
617  for (const auto& item : executors_and_keys->items) {
618  TF_RETURN_IF_ERROR(
619  cost_model_manager_.AddToCostGraphDef(item.graph, cost_graph));
620  }
621  }
622 
623  // If requested via RunOptions, output the partition graphs.
624  if (run_options.output_partition_graphs()) {
625  protobuf::RepeatedPtrField<GraphDef>* partition_graph_defs =
626  run_metadata->mutable_partition_graphs();
627  for (const PerPartitionExecutorsAndLib& exec_and_lib :
628  executors_and_keys->items) {
629  GraphDef* partition_graph_def = partition_graph_defs->Add();
630  exec_and_lib.graph->ToGraphDef(partition_graph_def);
631  }
632  }
633 
634  return Status::OK();
635 }
636 
637 Status TBBSession::ResourceHandleToInputTensor(const Tensor& resource_tensor,
638  Tensor* retrieved_tensor) {
639  if (resource_tensor.dtype() != DT_RESOURCE) {
640  return errors::InvalidArgument(strings::StrCat(
641  "ResourceHandleToInputTensor() received non-DT_RESOURCE Tensor: ",
642  resource_tensor.dtype()));
643  }
644 
645  const ResourceHandle& resource_handle =
646  resource_tensor.scalar<ResourceHandle>()();
647 
648  if (resource_handle.container() ==
649  SessionState::kTensorHandleResourceTypeName) {
650  return session_state_.GetTensor(resource_handle.name(), retrieved_tensor);
651  } else {
652  return errors::InvalidArgument(strings::StrCat(
653  "Invalid resource type hash code: ", resource_handle.hash_code(),
654  "(name: ", resource_handle.name(),
655  " type: ", resource_handle.maybe_type_name(),
656  "). Perhaps a resource tensor was being provided as a feed? That is "
657  "not currently allowed. Please file an issue at "
658  "https://github.com/tensorflow/tensorflow/issues/new, ideally with a "
659  "short code snippet that leads to this error message."));
660  }
661 }
662 
664  gtl::ArraySlice<string> inputs, gtl::ArraySlice<string> outputs,
665  gtl::ArraySlice<string> target_nodes, ExecutorsAndKeys** executors_and_keys,
666  RunStateArgs* run_state_args) {
667  int64 handle_name_counter_value = -1;
668  if (LogMemory::IsEnabled() || run_state_args->is_partial_run) {
669  handle_name_counter_value = handle_name_counter_.fetch_add(1);
670  }
671 
672  string debug_tensor_watches_summary;
673  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
674  debug_tensor_watches_summary = SummarizeDebugTensorWatches(
675  run_state_args->debug_options.debug_tensor_watch_opts());
676  }
677 
678  // Fast lookup path, no sorting.
679  const string key = strings::StrCat(
680  str_util::Join(inputs, ","), "->", str_util::Join(outputs, ","), "/",
681  str_util::Join(target_nodes, ","), "/", run_state_args->is_partial_run,
682  "/", debug_tensor_watches_summary);
683  // Set the handle, if it's needed to log memory or for partial run.
684  if (handle_name_counter_value >= 0) {
685  run_state_args->handle =
686  strings::StrCat(key, ";", handle_name_counter_value);
687  }
688 
689  // See if we already have the executors for this run.
690  {
691  mutex_lock l(executor_lock_); // could use reader lock
692  auto it = executors_.find(key);
693  if (it != executors_.end()) {
694  *executors_and_keys = it->second.get();
695  return Status::OK();
696  }
697  }
698 
699  // Slow lookup path, the unsorted key missed the cache.
700  // Sort the inputs and outputs, and look up with the sorted key in case an
701  // earlier call used a different order of inputs and outputs.
702  //
703  // We could consider some other signature instead of sorting that
704  // preserves the same property to avoid the sort in the future.
705  std::vector<string> inputs_sorted(inputs.begin(), inputs.end());
706  std::sort(inputs_sorted.begin(), inputs_sorted.end());
707  std::vector<string> outputs_sorted(outputs.begin(), outputs.end());
708  std::sort(outputs_sorted.begin(), outputs_sorted.end());
709  std::vector<string> tn_sorted(target_nodes.begin(), target_nodes.end());
710  std::sort(tn_sorted.begin(), tn_sorted.end());
711 
712  const string sorted_key = strings::StrCat(
713  str_util::Join(inputs_sorted, ","), "->",
714  str_util::Join(outputs_sorted, ","), "/", str_util::Join(tn_sorted, ","),
715  "/", run_state_args->is_partial_run, "/", debug_tensor_watches_summary);
716  // Set the handle, if its needed to log memory or for partial run.
717  if (handle_name_counter_value >= 0) {
718  run_state_args->handle =
719  strings::StrCat(sorted_key, ";", handle_name_counter_value);
720  }
721 
722  // See if we already have the executors for this run.
723  {
724  mutex_lock l(executor_lock_);
725  auto it = executors_.find(sorted_key);
726  if (it != executors_.end()) {
727  *executors_and_keys = it->second.get();
728  // Insert this under the original key.
729  executors_.emplace(key, it->second);
730  return Status::OK();
731  }
732  }
733 
734  // Nothing found, so create the executors and store in the cache.
735  BuildGraphOptions options;
736  options.feed_endpoints = inputs_sorted;
737  options.fetch_endpoints = outputs_sorted;
738  options.target_nodes = tn_sorted;
739  options.use_function_convention = !run_state_args->is_partial_run;
740  if (!run_state_args->debug_options.debug_tensor_watch_opts().empty()) {
741  options.debug_options = run_state_args->debug_options;
742  }
743 
744  std::shared_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys);
745 
746  // The executor_lock_ is intentionally released while executor is
747  // being created.
748  std::unordered_map<string, std::unique_ptr<Graph>> graphs;
749  TF_RETURN_IF_ERROR(CreateGraphs(options, &graphs, &ek->flib_def,
750  run_state_args, &ek->input_types,
751  &ek->output_types));
752 
753  if (run_state_args->is_partial_run) {
754  ek->graph = std::move(run_state_args->graph);
755  std::unordered_set<StringPiece, StringPieceHasher> names;
756  for (const string& input : inputs) {
757  TensorId id(ParseTensorName(input));
758  names.emplace(id.first);
759  }
760  for (const string& output : outputs) {
761  TensorId id(ParseTensorName(output));
762  names.emplace(id.first);
763  }
764  for (Node* n : ek->graph->nodes()) {
765  if (names.count(n->name()) > 0) {
766  ek->name_to_node.insert({n->name(), n});
767  }
768  }
769  }
770  ek->items.reserve(graphs.size());
771  const auto& optimizer_opts =
772  options_.config.graph_options().optimizer_options();
773 
774  int graph_def_version;
775  {
776  mutex_lock l(graph_def_lock_);
777  graph_def_version =
778  execution_state_->original_graph_def().versions().producer();
779  }
780  ek->proc_flr.reset(new ProcessFunctionLibraryRuntime(
781  device_mgr_.get(), options_.env, graph_def_version, ek->flib_def.get(),
782  optimizer_opts));
783 
784  GraphOptimizer optimizer(optimizer_opts);
785  for (auto iter = graphs.begin(); iter != graphs.end(); ++iter) {
786  const string& partition_name = iter->first;
787  std::unique_ptr<Graph>& partition_graph = iter->second;
788 
789  Device* device;
790  TF_RETURN_IF_ERROR(device_mgr_->LookupDevice(partition_name, &device));
791 
792  ek->items.resize(ek->items.size() + 1);
793  auto* item = &(ek->items.back());
794  auto lib = ek->proc_flr->GetFLR(partition_name);
795  if (lib == nullptr) {
796  return errors::Internal("Could not find device: ", partition_name);
797  }
798  item->flib = lib;
799 
800  LocalExecutorParams params;
801  params.device = device;
802  params.function_library = lib;
803  auto opseg = device->op_segment();
804  params.create_kernel = [this, lib, opseg](const NodeDef& ndef,
805  OpKernel** kernel) {
806  // We do not share the kernel via the OpSegment if the node is
807  // stateless, or a function.
808  // NOTE(mrry): We must not share function kernels (implemented
809  // using `CallOp`) between subgraphs, because `CallOp::handle_`
810  // is tied to a particular subgraph. Even if the function itself
811  // is stateful, the `CallOp` that invokes it is not.
812  if (!lib->IsStateful(ndef.op()) ||
813  lib->GetFunctionLibraryDefinition()->Find(ndef.op()) != nullptr) {
814  return lib->CreateKernel(ndef, kernel);
815  }
816  auto create_fn = [lib, &ndef](OpKernel** kernel) {
817  return lib->CreateKernel(ndef, kernel);
818  };
819  // Kernels created for subgraph nodes need to be cached. On
820  // cache miss, create_fn() is invoked to create a kernel based
821  // on the function library here + global op registry.
822  return opseg->FindOrCreate(session_handle_, ndef.name(), kernel,
823  create_fn);
824  };
825  params.delete_kernel = [lib](OpKernel* kernel) {
826  // If the node is stateful, opseg owns it. Otherwise, delete it.
827  if (kernel && !lib->IsStateful(kernel->type_string())) {
828  delete kernel;
829  }
830  };
831  params.node_outputs_cb = node_outputs_callback_;
832 
833  optimizer.Optimize(lib, options_.env, device, &iter->second,
834  /*shape_map=*/nullptr);
835 
836  // EXPERIMENTAL: tfdbg inserts debug nodes in the graph.
837  if (!options.debug_options.debug_tensor_watch_opts().empty()) {
838  TF_RETURN_IF_ERROR(DecorateAndPublishGraphForDebug(
839  options.debug_options, partition_graph.get(), params.device));
840  }
841 
842  TF_RETURN_IF_ERROR(EnsureMemoryTypes(DeviceType(device->device_type()),
843  device->name(),
844  partition_graph.get()));
845  // NewLocalExecutor takes ownership of partition_graph.
846  item->graph = partition_graph.get();
847  item->executor = nullptr;
848  item->device = device;
849  Executor* executor;
850  TF_RETURN_IF_ERROR(
851  NewLocalExecutor(params, partition_graph.release(), &executor));
852  item->executor.reset(executor);
853  }
854 
855  // Cache the mapping from input/output names to graph elements to
856  // avoid recomputing it every time.
857  if (!run_state_args->is_partial_run) {
858  // For regular `Run()`, we use the function calling convention, and so
859  // maintain a mapping from input/output names to
860  // argument/return-value ordinal index.
861  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
862  const string& input = inputs_sorted[i];
863  ek->input_name_to_index[input] = i;
864  }
865  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
866  const string& output = outputs_sorted[i];
867  ek->output_name_to_index[output] = i;
868  }
869  } else {
870  // For `PRun()`, we use the rendezvous calling convention, and so
871  // maintain a mapping from input/output names to rendezvous keys.
872  //
873  // We always use the first device as the device name portion of the
874  // key, even if we're feeding another graph.
875  for (size_t i = 0; i < inputs_sorted.size(); ++i) {
876  const string& input = inputs_sorted[i];
877  ek->input_name_to_rendezvous_key[input] = GetRendezvousKey(
878  input, device_set_.client_device()->attributes(), FrameAndIter(0, 0));
879  }
880  for (size_t i = 0; i < outputs_sorted.size(); ++i) {
881  const string& output = outputs_sorted[i];
882  ek->output_name_to_rendezvous_key[output] =
883  GetRendezvousKey(output, device_set_.client_device()->attributes(),
884  FrameAndIter(0, 0));
885  }
886  }
887 
888  // Reacquire the lock, try to insert into the map.
889  mutex_lock l(executor_lock_);
890 
891  // Another thread may have created the entry before us, in which case we will
892  // reuse the already created one.
893  auto insert_result = executors_.emplace(sorted_key, ek);
894  // Insert the value under the original key, so the fast path lookup will work
895  // if the user uses the same order of inputs, outputs, and targets again.
896  executors_.emplace(key, insert_result.first->second);
897  *executors_and_keys = insert_result.first->second.get();
898 
899  return Status::OK();
900 }
901 
903  const BuildGraphOptions& subgraph_options,
904  std::unordered_map<string, std::unique_ptr<Graph>>* outputs,
905  std::unique_ptr<FunctionLibraryDefinition>* flib_def,
906  RunStateArgs* run_state_args, DataTypeVector* input_types,
907  DataTypeVector* output_types) {
908  mutex_lock l(graph_def_lock_);
909  std::unique_ptr<ClientGraph> client_graph;
910 
911  std::unique_ptr<GraphExecutionState> temp_exec_state_holder;
912  GraphExecutionState* execution_state = nullptr;
913  if (options_.config.graph_options().place_pruned_graph()) {
914  // Because we are placing pruned graphs, we need to create a
915  // new GraphExecutionState for every new unseen graph,
916  // and then place it.
917  GraphExecutionStateOptions prune_options;
918  prune_options.device_set = &device_set_;
919  prune_options.session_options = &options_;
920  prune_options.stateful_placements = stateful_placements_;
921  TF_RETURN_IF_ERROR(GraphExecutionState::MakeForPrunedGraph(
922  execution_state_->original_graph_def().library(), prune_options,
923  execution_state_->original_graph_def(), subgraph_options,
924  &temp_exec_state_holder, &client_graph));
925  execution_state = temp_exec_state_holder.get();
926  } else {
927  execution_state = execution_state_.get();
928  TF_RETURN_IF_ERROR(
929  execution_state->BuildGraph(subgraph_options, &client_graph));
930  }
931 
932  if (subgraph_options.feed_endpoints.size() !=
933  client_graph->feed_types.size()) {
934  return errors::Internal(
935  "Graph pruning failed: requested number of feed endpoints = ",
936  subgraph_options.feed_endpoints.size(),
937  " versus number of pruned feed endpoints = ",
938  client_graph->feed_types.size());
939  }
940  if (subgraph_options.fetch_endpoints.size() !=
941  client_graph->fetch_types.size()) {
942  return errors::Internal(
943  "Graph pruning failed: requested number of fetch endpoints = ",
944  subgraph_options.fetch_endpoints.size(),
945  " versus number of pruned fetch endpoints = ",
946  client_graph->fetch_types.size());
947  }
948 
949  auto current_stateful_placements = execution_state->GetStatefulPlacements();
950  // Update our current state based on the execution_state's
951  // placements. If there are any mismatches for a node,
952  // we should fail, as this should never happen.
953  for (auto placement_pair : current_stateful_placements) {
954  const string& node_name = placement_pair.first;
955  const string& placement = placement_pair.second;
956  auto iter = stateful_placements_.find(node_name);
957  if (iter == stateful_placements_.end()) {
958  stateful_placements_.insert(std::make_pair(node_name, placement));
959  } else if (iter->second != placement) {
960  return errors::Internal(
961  "Stateful placement mismatch. "
962  "Current assignment of ",
963  node_name, " to ", iter->second, " does not match ", placement);
964  }
965  }
966 
967  stateful_placements_ = execution_state->GetStatefulPlacements();
968 
969  // Remember the graph in run state if this is a partial run.
970  if (run_state_args->is_partial_run) {
971  run_state_args->graph.reset(new Graph(flib_def_.get()));
972  CopyGraph(*execution_state->full_graph(), run_state_args->graph.get());
973  }
974 
975  // Partition the graph across devices.
976  PartitionOptions popts;
977  popts.node_to_loc = [](const Node* node) {
978  assert(node != nullptr);
979  return node->assigned_device_name();
980  };
981  popts.new_name = [this](const string& prefix) {
982  return strings::StrCat(prefix, "/_", edge_name_counter_.fetch_add(1));
983  };
984  popts.get_incarnation = [](const string& name) {
985  // The direct session does not have changing incarnation numbers.
986  // Just return '1'.
987  return 1;
988  };
989  popts.flib_def = &client_graph->graph.flib_def();
990  popts.control_flow_added = false;
991 
992  std::unordered_map<string, GraphDef> partitions;
993  TF_RETURN_IF_ERROR(Partition(popts, &client_graph->graph, &partitions));
994 
995  std::vector<string> device_names;
996  for (auto device : devices_) {
997  // Extract the LocalName from the device.
998  device_names.push_back(DeviceNameUtils::LocalName(device->name()));
999  }
1000 
1001  // Check for valid partitions.
1002  for (const auto& partition : partitions) {
1003  const string local_partition_name =
1004  DeviceNameUtils::LocalName(partition.first);
1005  if (std::count(device_names.begin(), device_names.end(),
1006  local_partition_name) == 0) {
1007  return errors::InvalidArgument(
1008  "Creating a partition for ", local_partition_name,
1009  " which doesn't exist in the list of available devices. Available "
1010  "devices: ",
1011  str_util::Join(device_names, ","));
1012  }
1013  }
1014 
1015  for (const auto& partition : partitions) {
1016  std::unique_ptr<Graph> device_graph(
1017  new Graph(client_graph->flib_def.get()));
1018  GraphConstructorOptions device_opts;
1019  // There are internal operations (e.g., send/recv) that we now allow.
1020  device_opts.allow_internal_ops = true;
1021  device_opts.expect_device_spec = true;
1022  TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(device_opts, partition.second,
1023  device_graph.get()));
1024  outputs->emplace(partition.first, std::move(device_graph));
1025  }
1026 
1027  GraphOptimizationPassOptions optimization_options;
1028  optimization_options.session_options = &options_;
1029  optimization_options.flib_def = client_graph->flib_def.get();
1030  optimization_options.partition_graphs = outputs;
1031  TF_RETURN_IF_ERROR(OptimizationPassRegistry::Global()->RunGrouping(
1032  OptimizationPassRegistry::POST_PARTITIONING, optimization_options));
1033 
1034  Status s;
1035  for (auto& partition : *outputs) {
1036  const string& partition_name = partition.first;
1037  std::unique_ptr<Graph>* graph = &partition.second;
1038 
1039  VLOG(2) << "Created " << DebugString(graph->get()) << " for "
1040  << partition_name;
1041 
1042  // Give the device an opportunity to rewrite its subgraph.
1043  Device* d;
1044  s = device_mgr_->LookupDevice(partition_name, &d);
1045  if (!s.ok()) break;
1046  s = d->MaybeRewriteGraph(graph);
1047  if (!s.ok()) {
1048  break;
1049  }
1050  }
1051  *flib_def = std::move(client_graph->flib_def);
1052  std::swap(*input_types, client_graph->feed_types);
1053  std::swap(*output_types, client_graph->fetch_types);
1054  return s;
1055 }
1056 
1058  std::vector<DeviceAttributes>* response) {
1059  response->clear();
1060  response->reserve(devices_.size());
1061  for (Device* d : devices_) {
1062  const DeviceAttributes& attrs = d->attributes();
1063  response->emplace_back(attrs);
1064  }
1066 }
1067 
1069  const std::vector<string>& containers) {
1070  device_mgr_->ClearContainers(containers);
1072 }
1073 
1075  cancellation_manager_->StartCancel();
1076  {
1077  mutex_lock l(closed_lock_);
1078  if (closed_) return ::tensorflow::Status::OK();
1079  closed_ = true;
1080  }
1081  if (factory_ != nullptr) factory_->Deregister(this);
1083 }
1084 
1086  const std::vector<string>& pending_input_names,
1087  const std::vector<string>& pending_output_names, int64 step_id,
1088  const std::vector<Device*>* devices)
1089  : step_container(step_id, [devices](const string& name) {
1090  for (auto d : *devices) {
1091  if (!d->resource_manager()->Cleanup(name).ok()) {
1092  // Do nothing...
1093  }
1094  }
1095  }) {
1096  // Initially all the feeds and fetches are pending.
1097  for (auto& name : pending_input_names) {
1098  pending_inputs[name] = false;
1099  }
1100  for (auto& name : pending_output_names) {
1101  pending_outputs[name] = false;
1102  }
1103 }
1104 
1106  const std::vector<Device*>* devices)
1107  : RunState({}, {}, step_id, devices) {}
1108 
1110  if (rendez != nullptr) {
1111  if (!executors_done.HasBeenNotified()) {
1112  rendez->StartAbort(errors::Cancelled("PRun cancellation"));
1113  executors_done.WaitForNotification();
1114  }
1115  rendez->Unref();
1116  }
1117 }
1118 
1120  for (const auto& it : pending_inputs) {
1121  if (!it.second) return false;
1122  }
1123  for (const auto& it : pending_outputs) {
1124  if (!it.second) return false;
1125  }
1126  return true;
1127 }
1128 
1129 void TBBSession::WaitForNotification(tbb::task_arena& arena, tbb::task_group& taskGroup,
1130  RunState* run_state, CancellationManager* cm, int64 timeout_in_ms) {
1131  // Doing the wait in the arena adds this thread to the arena
1132  // and therefore tasks associated to the group can run on this thread
1133  arena.execute([&taskGroup]() { taskGroup.wait();} );
1134 
1135  const Status status =
1136  WaitForNotification(&run_state->executors_done, timeout_in_ms);
1137  if (!status.ok()) {
1138  {
1139  mutex_lock l(run_state->mu_);
1140  run_state->status.Update(status);
1141  }
1142  cm->StartCancel();
1143  // We must wait for the executors to complete, because they have borrowed
1144  // references to `cm` and other per-step state. After this notification, it
1145  // is safe to clean up the step.
1146  run_state->executors_done.WaitForNotification();
1147  }
1148 }
1149 
1151  Notification* notification, int64 timeout_in_ms) {
1152  if (timeout_in_ms > 0) {
1153  const int64 timeout_in_us = timeout_in_ms * 1000;
1154  const bool notified =
1155  WaitForNotificationWithTimeout(notification, timeout_in_us);
1156  if (!notified) {
1157  return Status(error::DEADLINE_EXCEEDED,
1158  "Timed out waiting for notification");
1159  }
1160  } else {
1161  notification->WaitForNotification();
1162  }
1163  return Status::OK();
1164 }
1165 
1166 } // namespace tensorflow
std::unique_ptr< FunctionLibraryDefinition > flib_def_
Definition: TBBSession.h:311
static boost::mutex mutex
Definition: Proxy.cc:11
std::vector< PerPartitionExecutorsAndLib > items
Definition: TBBSession.h:159
RunState(int64 step_id, const std::vector< Device * > *devices)
Definition: TBBSession.cc:1105
std::unordered_map< string, size_t > input_name_to_index
Definition: TBBSession.h:160
static const HistoName names[]
::tensorflow::Status Close() override
Definition: TBBSession.cc:1074
std::vector< Device * > devices_
Definition: TBBSession.h:263
std::atomic< int64 > handle_name_counter_
Definition: TBBSession.h:319
std::vector< std::pair< string, Tensor > > NamedTensorList
Definition: TBBSession.h:93
std::atomic< int64 > edge_name_counter_
Definition: TBBSession.h:318
::tensorflow::Status WaitForNotification(Notification *n, int64 timeout_in_ms)
Definition: TBBSession.cc:1150
::tensorflow::Status DecorateAndPublishGraphForDebug(const DebugOptions &debug_options, Graph *graph, Device *device)
Definition: TBBSession.cc:346
CostModelManager cost_model_manager_
Definition: TBBSession.h:328
#define LOG(A)
Definition: config.py:1
std::unordered_map< string, bool > pending_inputs
Definition: TBBSession.h:179
::tensorflow::Status Create(const GraphDef &graph) override
Definition: TBBSession.cc:289
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e g
Definition: Activities.doc:4
Status Reset(const SessionOptions &options, const std::vector< string > &containers) override
Definition: TBBSession.cc:135
Session * NewSession(const SessionOptions &options) override
Definition: TBBSession.cc:113
static std::string const input
Definition: EdmProvDump.cc:44
Partition
Definition: HLTHPDFilter.cc:32
void Deregister(const TBBSession *session)
Definition: TBBSession.cc:157
Status MaybeInitializeExecutionState(const GraphDef &graph, bool *out_already_initialized) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: TBBSession.cc:258
::tensorflow::Status GetOrCreateExecutors(gtl::ArraySlice< string > inputs, gtl::ArraySlice< string > outputs, gtl::ArraySlice< string > target_nodes, ExecutorsAndKeys **executors_and_keys, RunStateArgs *run_state_args)
Definition: TBBSession.cc:663
static NTSessionRegistrar registrar
Definition: NTSession.cc:173
IntraProcessRendezvous * rendez
Definition: TBBSession.h:176
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
std::vector< TBBSession * > sessions_ GUARDED_BY(sessions_lock_)
void SchedClosure(tbb::task_arena &arena, tbb::task_group &g, std::function< void()> c)
Definition: TBBSession.cc:194
::tensorflow::Status ResourceHandleToInputTensor(const Tensor &resource_tensor, Tensor *retrieved_tensor)
Definition: TBBSession.cc:637
CancellationManager * cancellation_manager_
Definition: TBBSession.h:295
::tensorflow::Status CreateGraphs(const BuildGraphOptions &options, std::unordered_map< string, std::unique_ptr< Graph >> *outputs, std::unique_ptr< FunctionLibraryDefinition > *flib_def, RunStateArgs *run_state_args, DataTypeVector *input_types, DataTypeVector *output_types)
Definition: TBBSession.cc:902
#define CMS_THREAD_SAFE
::tensorflow::Status Reset(const std::vector< string > &containers)
Definition: TBBSession.cc:1068
const DebugOptions & debug_options
Definition: TBBSession.h:202
::tensorflow::Status Extend(const GraphDef &graph) override
Definition: TBBSession.cc:302
std::pair< int, edm::FunctionWithDict > OK
Definition: findMethod.cc:136
std::unordered_map< string, bool > pending_outputs
Definition: TBBSession.h:180
::tensorflow::Status CheckNotClosed()
Definition: TBBSession.h:243
std::atomic_int_fast64_t step_count
Definition: TBBSession.h:154
const SessionOptions options_
Definition: TBBSession.h:259
TBBSessionFactory *const factory_
Definition: TBBSession.h:294
static std::atomic_int_fast64_t step_id_counter_
Definition: TBBSession.h:322
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
const std::unique_ptr< const DeviceMgr > device_mgr_
Definition: TBBSession.h:262
const int64 operation_timeout_in_ms_
Definition: TBBSession.h:325
Executor::Args::NodeOutputsCallback node_outputs_callback_
Definition: TBBSession.h:330
::tensorflow::Status Run(const NamedTensorList &inputs, const std::vector< string > &output_names, const std::vector< string > &target_nodes, std::vector< Tensor > *outputs) override
Definition: TBBSession.cc:323
TBBSession(const SessionOptions &options, const DeviceMgr *device_mgr, TBBSessionFactory *factory)
Definition: TBBSession.cc:198
std::unique_ptr< Graph > graph
Definition: TBBSession.h:201
::tensorflow::Status CreateDebuggerState(const DebugOptions &debug_options, int64 session_run_index, int64 executor_step_index, const std::vector< string > &input_names, const std::vector< string > &output_names, const std::vector< string > &target_names, std::unique_ptr< DebuggerStateInterface > *debugger_state)
Definition: TBBSession.cc:332
SessionState session_state_
Definition: TBBSession.h:292
::tensorflow::Status ExtendLocked(const GraphDef &graph) EXCLUSIVE_LOCKS_REQUIRED(graph_def_lock_)
Definition: TBBSession.cc:308
graphs
Definition: cuy.py:960
::tensorflow::Status ListDevices(std::vector< DeviceAttributes > *response) override
Definition: TBBSession.cc:1057
def move(src, dest)
Definition: eostools.py:510
static const int ERROR
bool AcceptsOptions(const SessionOptions &options) override
Definition: TBBSession.cc:109