From 85c31852c1cd165485288ed8ad7fbb65310ca198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=87=A7=E5=BA=86=E9=A6=99?= Date: Sun, 28 Apr 2024 21:46:45 +0800 Subject: [PATCH] kbyk profiler performance optimization --- .../hal/profiler/profiling_data_dumper.cc | 104 ++- .../hal/profiler/profiling_data_dumper.h | 61 +- .../profiler/parser/ascend_msprof_exporter.py | 2 +- .../parser/ascend_msprof_generator.py | 364 +++++----- .../profiler/parser/ascend_op_generator.py | 638 +++++++++--------- 5 files changed, 565 insertions(+), 604 deletions(-) diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc index 06e675513918..6a211a1c497a 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc +++ b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc @@ -160,7 +160,6 @@ uint64_t Utils::GetPid() { template void RingBuffer::Init(size_t capacity) { capacity_ = capacity; - mask_ = capacity_ - 1; data_queue_.resize(capacity); is_inited_ = true; is_quit_ = false; @@ -174,7 +173,6 @@ void RingBuffer::UnInit() { write_index_ = 0; idle_write_index_ = 0; capacity_ = 0; - mask_ = 0; is_quit_ = true; is_inited_ = false; } @@ -182,37 +180,32 @@ void RingBuffer::UnInit() { template size_t RingBuffer::Size() { - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - size_t curr_write_index = write_index_.load(std::memory_order_relaxed); - if (curr_read_index > curr_write_index) { - return capacity_ - (curr_read_index & mask_) + (curr_write_index & mask_); + size_t curr_read_index = read_index_.load(std::memory_order_acquire); + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_read_index >= curr_write_index) { + return 0; } return curr_write_index - curr_read_index; } +template +bool RingBuffer::Full() { + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_write_index >= capacity_) { + return true; + } else { + return false; + } +} + template bool RingBuffer::Push(T data) { size_t curr_write_index = 0; - size_t next_write_index = 0; - size_t cycles = 0; - do { - if (!is_inited_ || is_quit_) { - return false; - } - cycles++; - if (cycles >= 1024) { - return false; - } - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - curr_write_index = idle_write_index_.load(std::memory_order_relaxed); - next_write_index = curr_write_index + 1; - if ((next_write_index & mask_) == (curr_read_index & mask_)) { - return false; - } - } while (!idle_write_index_.compare_exchange_weak(curr_write_index, next_write_index)); - size_t index = curr_write_index & mask_; - data_queue_[index] = std::move(data); - write_index_++; + curr_write_index = write_index_.fetch_add(1, std::memory_order_acquire); + if (curr_write_index >= capacity_) { + return false; + } + data_queue_[curr_write_index] = std::move(data); return true; } @@ -221,23 +214,27 @@ T RingBuffer::Pop() { if (!is_inited_) { return nullptr; } - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - size_t curr_write_index = write_index_.load(std::memory_order_relaxed); - if ((curr_read_index & mask_) == (curr_write_index & mask_) && !is_quit_) { + size_t curr_read_index = read_index_.fetch_add(1, std::memory_order_acquire); + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_read_index >= curr_write_index || curr_read_index >= capacity_) { return nullptr; } - size_t index = curr_read_index & mask_; - T data = std::move(data_queue_[index]); - read_index_++; + T data = std::move(data_queue_[curr_read_index]); return data; } +template +void RingBuffer::Reset() { + write_index_ = 0; + read_index_ = 0; +} + ProfilingDataDumper::ProfilingDataDumper() : path_(""), start_(false), init_(false) {} ProfilingDataDumper::~ProfilingDataDumper() { UnInit(); } void ProfilingDataDumper::Init(const std::string &path, size_t capacity) { - MS_LOG(INFO) << "init profiling data dumper."; + MS_LOG(INFO) << "init profiling data dumper, capacity: " << capacity; path_ = path; data_chunk_buf_.Init(capacity); init_.store(true); @@ -263,18 +260,13 @@ void ProfilingDataDumper::Start() { if (!init_.load() || !Utils::CreateDir(path_)) { return; } - if (Thread::Start() != 0) { - MS_LOG(ERROR) << "profiling data dumper thread start failed."; - return; - } start_.store(true); } void ProfilingDataDumper::Stop() { - MS_LOG(INFO) << "stop profiling data dumper."; + MS_LOG(WARNING) << "stop profiling data dumper."; if (start_.load() == true) { start_.store(false); - Thread::Stop(); } Flush(); } @@ -302,30 +294,34 @@ void ProfilingDataDumper::GatherAndDumpData() { } } -void ProfilingDataDumper::Run() { - for (;;) { - if (!start_.load()) { - break; - } - if (data_chunk_buf_.Size() > kNotifyInterval) { - GatherAndDumpData(); - } else { - usleep(kMaxWaitTimeUs); - } - } -} - void ProfilingDataDumper::Flush() { - while (data_chunk_buf_.Size() != 0) { + MS_LOG(WARNING) << "data_chunk_buf_.Size: " << data_chunk_buf_.Size(); + while (data_chunk_buf_.Size() > 0) { GatherAndDumpData(); } + data_chunk_buf_.Reset(); } void ProfilingDataDumper::Report(std::unique_ptr data) { if (!start_.load() || data == nullptr) { return; } - data_chunk_buf_.Push(std::move(data)); + int i = 0; + while (is_flush_.load() && i < 10) { + usleep(kMaxWaitTimeUs); + i++; + } + if (!data_chunk_buf_.Push(std::move(data))) { + is_flush_.store(true); + std::lock_guard flush_lock_(flush_mutex_); + if (data_chunk_buf_.Full()) { + Flush(); + } + is_flush_.store(false); + if (!data_chunk_buf_.Push(std::move(data))) { + MS_LOG(ERROR) << "profiling data Report failed."; + } + } } void ProfilingDataDumper::Dump(const std::map> &dataMap) { diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h index 3d93e34b4f8f..b3cc51e3263d 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h +++ b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h @@ -36,10 +36,10 @@ namespace mindspore { namespace profiler { namespace ascend { -constexpr uint32_t kDefaultRingBuffer = 1024; +constexpr uint32_t kDefaultRingBuffer = 10 * 1000; constexpr uint32_t kBatchMaxLen = 5 * 1024 * 1024; // 5 MB -constexpr uint32_t kMaxWaitTimeUs = 1000 * 1000; -constexpr uint32_t kNotifyInterval = 1000; +constexpr uint32_t kMaxWaitTimeUs = 100 * 1000; +constexpr uint32_t kMaxWaitTimes = 10; class Utils { public: @@ -75,6 +75,8 @@ class RingBuffer { size_t Size(); bool Push(T data); T Pop(); + bool Full(); + void Reset(); private: bool is_inited_; @@ -87,53 +89,6 @@ class RingBuffer { std::vector data_queue_; }; -class Thread { - public: - Thread() : is_alive_(false), pid_(0), thread_name_("NPUProfiler") {} - - ~Thread() { - if (is_alive_) { - (void)pthread_cancel(pid_); - (void)pthread_join(pid_, nullptr); - } - } - - void SetThreadName(const std::string &name) { - if (!name.empty()) { - thread_name_ = name; - } - } - - std::string GetThreadName() { return thread_name_; } - - int Start() { - int ret = pthread_create(&pid_, nullptr, Execute, reinterpret_cast(this)); - is_alive_ = (ret == 0) ? true : false; - return ret; - } - - int Stop() { return Join(); } - - int Join() { - int ret = pthread_join(pid_, nullptr); - is_alive_ = (ret == 0) ? false : true; - return ret; - } - - private: - static void *Execute(void *args) { - Thread *thr = reinterpret_cast(args); - thr->Run(); - return nullptr; - } - virtual void Run() = 0; - - private: - bool is_alive_; - pthread_t pid_; - std::string thread_name_; -}; - struct BaseReportData { int32_t device_id{0}; std::string tag; @@ -142,7 +97,7 @@ struct BaseReportData { virtual std::vector encode() = 0; }; -class ProfilingDataDumper : public Thread { +class ProfilingDataDumper { public: ProfilingDataDumper(); virtual ~ProfilingDataDumper(); @@ -151,6 +106,7 @@ class ProfilingDataDumper : public Thread { void Report(std::unique_ptr data); void Start(); void Stop(); + void Flush(); static std::shared_ptr &GetInstance() { static std::shared_ptr instance = std::make_shared(); @@ -158,7 +114,6 @@ class ProfilingDataDumper : public Thread { } private: - void Flush(); void Dump(const std::map> &dataMap); void Run(); void GatherAndDumpData(); @@ -167,8 +122,10 @@ class ProfilingDataDumper : public Thread { std::string path_; std::atomic start_; std::atomic init_; + std::atomic is_flush_{false}; RingBuffer> data_chunk_buf_; std::map fd_map_; + std::mutex flush_mutex_; }; } // namespace ascend diff --git a/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py b/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py index 6c93bb90707a..61075e173c34 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py @@ -253,7 +253,7 @@ class AscendMsprofExporter: if not op_summary: raise RuntimeError("The op_summary csv file was not found, perhaps the original data was not collected.") if not op_statistic: - raise RuntimeError("The op_statistics csv file was not found, perhaps the original data was not collected.") + logger.warning("The op_statistics csv file was not found, perhaps the original data was not collected.") if not msprof_json: raise RuntimeError("The msprof json file was not found, perhaps the original data was not collected.") diff --git a/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py b/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py index 813e96c0f5ee..1792a8e6eee3 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py @@ -1,180 +1,184 @@ -# Copyright 2023 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -"""msprof data generate model""" -import csv -import glob -import numpy as np -from mindspore.profiler.common.util import get_newest_file - - -class AscendMsprofDataGenerator: - """Generate ascend data from files.""" - - _mindspore_model_id = 4294967295 - - def __init__(self, mindstudio_profiler_output): - self.mindstudio_profiler_output = mindstudio_profiler_output - self.op_summary = None - self.op_statistic = None - self.steptrace = [] - self.steptrace_model = [] - - self.op_summary_type = [ - ('Model ID', float), - ('Task ID', int), - ('Stream ID', int), - ('Op Name', object), - ('Op Type', object), - ('Task Type', object), - ('Task Start Time', object), - ('Task Duration', float), - ('Task Wait Time', float), - ('Input Shapes', object), - ('Input Data Types', object), - ('Input Formats', object), - ('Output Shapes', object), - ('Output Data Types', object), - ('Output Formats', object), - ('Task Start Time(us)', object) - ] - - self.op_statistic_type = [ - ('Op Type', object), - ('Count', int), - ('Total Time', float), - ] - - self.steptrace_type = [ - ('Iteration ID', int), - ('FP Start', float), - ('BP End', float), - ('Iteration End', float), - ('Iteration Time', float), - ('FP to BP Time', float), - ('Iteration Refresh', float), - ('Data Aug Bound', float), - ('Model ID', float), - ] - - def parse(self): - """read msprof data generate DataFrame data""" - self._read_op_summary() - - self._read_op_statistic() - - self._read_steptrace() - - self.steptrace_model = self.steptrace[self.steptrace['Model ID'] == self._mindspore_model_id] - - self.steptrace = self.steptrace[self.steptrace['Model ID'] != self._mindspore_model_id] - - result = (self.op_summary, self.op_statistic, self.steptrace, self.steptrace_model) - - return result - - def _read_op_summary(self): - """read op summary to memory""" - op_summary = [] - op_summary_name = fr'{self.mindstudio_profiler_output}/op_summary_*.csv' - op_summary_file = get_newest_file(glob.glob(op_summary_name))[0] - with open(op_summary_file, newline='') as csvfile: - reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') - for row in reader: - vector_fops = row.get('vector_fops', None) - cube_fops = row.get('cube_fops', None) - aiv_vector_fops = row.get('aiv_vector_fops', None) - aic_cube_fops = row.get('aic_cube_fops', None) - - new_row = [row.get('Model ID'), row.get('Task ID'), row.get('Stream ID'), row.get('Op Name'), - row.get('OP Type'), row.get('Task Type'), row.get('Task Start Time(us)'), - row.get('Task Duration(us)'), row.get('Task Wait Time(us)'), row.get('Input Shapes'), - row.get('Input Data Types'), row.get('Input Formats'), row.get('Output Shapes'), - row.get('Output Data Types'), row.get('Output Formats'), '0.000'] - if vector_fops is not None and cube_fops is not None: - new_row.append(vector_fops) - new_row.append(cube_fops) - elif aiv_vector_fops is not None and aic_cube_fops is not None: - new_row.append(aiv_vector_fops) - new_row.append(aic_cube_fops) - op_summary.append(tuple(['0' if d == 'N/A' else d for d in new_row])) - - if op_summary and len(op_summary[0]) > len(self.op_summary_type): - self.op_summary_type.extend([ - ('vector_fops', float), - ('cube_fops', float) - ]) - - op_summary_dt = np.dtype(self.op_summary_type) - - self.op_summary = np.array(op_summary, dtype=op_summary_dt) - high_acc_time = self.op_summary['Task Start Time'].copy() - self.op_summary['Task Start Time(us)'] = high_acc_time - self.op_summary['Task Start Time'] = self.op_summary['Task Start Time'].astype(float) * 1e-3 - self.op_summary['Task Duration'] = self.op_summary['Task Duration'] * 1e-3 - self.op_summary['Task Wait Time'] = self.op_summary['Task Wait Time'] * 1e-3 - - def _read_op_statistic(self): - """read op statistic to memory""" - op_statistic = [] - op_statistic_name = fr'{self.mindstudio_profiler_output}/op_statistic_*.csv' - op_statistic_file = get_newest_file(glob.glob(op_statistic_name))[0] - with open(op_statistic_file, newline='') as csvfile: - reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') - for row in reader: - new_row = ( - row.get('OP Type'), - row.get('Count'), - row.get('Total Time(us)'), - ) - new_row = tuple(['0' if d == 'N/A' else d for d in new_row]) - op_statistic.append(new_row) - - op_statistic_dt = np.dtype(self.op_statistic_type) - self.op_statistic = np.array(op_statistic, dtype=op_statistic_dt) - self.op_statistic['Total Time'] *= 1e-3 - - def _read_steptrace(self): - """read steptrace to memory""" - step_trace = [] - step_trace_name = fr'{self.mindstudio_profiler_output}/step_trace_*.csv' - step_trace_file_list = get_newest_file(glob.glob(step_trace_name)) - for step_trace_file in step_trace_file_list: - with open(step_trace_file, newline='') as csvfile: - reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') - for row in reader: - new_row = [ - row.get('Iteration ID'), - row.get('FP Start(us)'), - row.get('BP End(us)'), - row.get('Iteration End(us)'), - row.get('Iteration Time(us)'), - row.get('FP to BP Time(us)'), - row.get('Iteration Refresh(us)'), - row.get('Data Aug Bound(us)'), - row.get('Model ID'), - ] - step_trace.append(tuple(['0' if i == 'N/A' else i for i in new_row])) - break - - steptrace_dt = np.dtype(self.steptrace_type) - - self.steptrace = np.array(step_trace, dtype=steptrace_dt) - self.steptrace['FP Start'] = self.steptrace['FP Start'] * 1e-3 - self.steptrace['BP End'] = self.steptrace['BP End'] * 1e-3 - self.steptrace['Iteration End'] = self.steptrace['Iteration End'] * 1e-3 - self.steptrace['Iteration Time'] = self.steptrace['Iteration Time'] * 1e-3 - self.steptrace['FP to BP Time'] = self.steptrace['FP to BP Time'] * 1e-3 - self.steptrace['Iteration Refresh'] = self.steptrace['Iteration Refresh'] * 1e-3 - self.steptrace['Data Aug Bound'] = self.steptrace['Data Aug Bound'] * 1e-3 +# Copyright 2023 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""msprof data generate model""" +import csv +import glob +import numpy as np +from mindspore.profiler.common.util import get_newest_file + + +class AscendMsprofDataGenerator: + """Generate ascend data from files.""" + + _mindspore_model_id = 4294967295 + + def __init__(self, mindstudio_profiler_output): + self.mindstudio_profiler_output = mindstudio_profiler_output + self.op_summary = None + self.op_statistic = None + self.steptrace = [] + self.steptrace_model = [] + + self.op_summary_type = [ + ('Model ID', float), + ('Task ID', int), + ('Stream ID', int), + ('Op Name', object), + ('Op Type', object), + ('Task Type', object), + ('Task Start Time', object), + ('Task Duration', float), + ('Task Wait Time', float), + ('Input Shapes', object), + ('Input Data Types', object), + ('Input Formats', object), + ('Output Shapes', object), + ('Output Data Types', object), + ('Output Formats', object), + ('Task Start Time(us)', object) + ] + + self.op_statistic_type = [ + ('Op Type', object), + ('Count', int), + ('Total Time', float), + ] + + self.steptrace_type = [ + ('Iteration ID', int), + ('FP Start', float), + ('BP End', float), + ('Iteration End', float), + ('Iteration Time', float), + ('FP to BP Time', float), + ('Iteration Refresh', float), + ('Data Aug Bound', float), + ('Model ID', float), + ] + + def parse(self): + """read msprof data generate DataFrame data""" + self._read_op_summary() + + self._read_op_statistic() + + self._read_steptrace() + + self.steptrace_model = self.steptrace[self.steptrace['Model ID'] == self._mindspore_model_id] + + self.steptrace = self.steptrace[self.steptrace['Model ID'] != self._mindspore_model_id] + + result = (self.op_summary, self.op_statistic, self.steptrace, self.steptrace_model) + + return result + + def _read_op_summary(self): + """read op summary to memory""" + op_summary = [] + op_summary_name = fr'{self.mindstudio_profiler_output}/op_summary_*.csv' + op_summary_file = get_newest_file(glob.glob(op_summary_name))[0] + with open(op_summary_file, newline='') as csvfile: + reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') + for row in reader: + vector_fops = row.get('vector_fops', None) + cube_fops = row.get('cube_fops', None) + aiv_vector_fops = row.get('aiv_vector_fops', None) + aic_cube_fops = row.get('aic_cube_fops', None) + + new_row = [row.get('Model ID'), row.get('Task ID'), row.get('Stream ID'), row.get('Op Name'), + row.get('OP Type'), row.get('Task Type'), row.get('Task Start Time(us)'), + row.get('Task Duration(us)'), row.get('Task Wait Time(us)'), row.get('Input Shapes'), + row.get('Input Data Types'), row.get('Input Formats'), row.get('Output Shapes'), + row.get('Output Data Types'), row.get('Output Formats'), '0.000'] + if vector_fops is not None and cube_fops is not None: + new_row.append(vector_fops) + new_row.append(cube_fops) + elif aiv_vector_fops is not None and aic_cube_fops is not None: + new_row.append(aiv_vector_fops) + new_row.append(aic_cube_fops) + op_summary.append(tuple(['0' if d == 'N/A' else d for d in new_row])) + + if op_summary and len(op_summary[0]) > len(self.op_summary_type): + self.op_summary_type.extend([ + ('vector_fops', float), + ('cube_fops', float) + ]) + + op_summary_dt = np.dtype(self.op_summary_type) + + self.op_summary = np.array(op_summary, dtype=op_summary_dt) + high_acc_time = self.op_summary['Task Start Time'].copy() + self.op_summary['Task Start Time(us)'] = high_acc_time + self.op_summary['Task Start Time'] = self.op_summary['Task Start Time'].astype(float) * 1e-3 + self.op_summary['Task Duration'] = self.op_summary['Task Duration'] * 1e-3 + self.op_summary['Task Wait Time'] = self.op_summary['Task Wait Time'] * 1e-3 + + def _read_op_statistic(self): + """read op statistic to memory""" + op_statistic = [] + op_statistic_name = fr'{self.mindstudio_profiler_output}/op_statistic_*.csv' + op_statistic_files = glob.glob(op_statistic_name) + if not op_statistic_files: + return + op_statistic_file = get_newest_file(op_statistic_files)[0] + with open(op_statistic_file, newline='') as csvfile: + reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') + for row in reader: + new_row = ( + row.get('OP Type'), + row.get('Count'), + row.get('Total Time(us)'), + ) + new_row = tuple(['0' if d == 'N/A' else d for d in new_row]) + op_statistic.append(new_row) + if not op_statistic: + return + op_statistic_dt = np.dtype(self.op_statistic_type) + self.op_statistic = np.array(op_statistic, dtype=op_statistic_dt) + self.op_statistic['Total Time'] *= 1e-3 + + def _read_steptrace(self): + """read steptrace to memory""" + step_trace = [] + step_trace_name = fr'{self.mindstudio_profiler_output}/step_trace_*.csv' + step_trace_file_list = get_newest_file(glob.glob(step_trace_name)) + for step_trace_file in step_trace_file_list: + with open(step_trace_file, newline='') as csvfile: + reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') + for row in reader: + new_row = [ + row.get('Iteration ID'), + row.get('FP Start(us)'), + row.get('BP End(us)'), + row.get('Iteration End(us)'), + row.get('Iteration Time(us)'), + row.get('FP to BP Time(us)'), + row.get('Iteration Refresh(us)'), + row.get('Data Aug Bound(us)'), + row.get('Model ID'), + ] + step_trace.append(tuple(['0' if i == 'N/A' else i for i in new_row])) + break + + steptrace_dt = np.dtype(self.steptrace_type) + + self.steptrace = np.array(step_trace, dtype=steptrace_dt) + self.steptrace['FP Start'] = self.steptrace['FP Start'] * 1e-3 + self.steptrace['BP End'] = self.steptrace['BP End'] * 1e-3 + self.steptrace['Iteration End'] = self.steptrace['Iteration End'] * 1e-3 + self.steptrace['Iteration Time'] = self.steptrace['Iteration Time'] * 1e-3 + self.steptrace['FP to BP Time'] = self.steptrace['FP to BP Time'] * 1e-3 + self.steptrace['Iteration Refresh'] = self.steptrace['Iteration Refresh'] * 1e-3 + self.steptrace['Data Aug Bound'] = self.steptrace['Data Aug Bound'] * 1e-3 \ No newline at end of file diff --git a/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py b/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py index 5fc72480e751..2441ebb3cb8a 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py @@ -1,317 +1,321 @@ -# Copyright 2023 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -"""op analyse model""" -import csv -import json -import logging -import os -import stat -from typing import Optional, List - -import numpy as np -from mindspore import log as logger -from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException - - -class AscendOPGenerator: - """Generate ascend op data from DataFrame.""" - - def __init__( - self, op_summary: np.ndarray, op_statistic: np.ndarray, - dynamic_status: bool = False, launch_ops: Optional[List] = None - ): - self.op_summary = op_summary - self.op_statistic = op_statistic - self.dynamic_status = dynamic_status - self.op_detail = None - self.op_type = None - self.aicpu_detail = None - self.framework_raw = None - self.output_timeline_data = None - self.launch_ops = launch_ops if launch_ops else [] - self.aclnn_status = bool(launch_ops) - self._full_kernel_name = None - self._sub_graph = None - self._op_name = None - self._kernel_name = None - - self.op_detail_dt = np.dtype( - [('full_kernel_name', object), ('task_duration', float), - ('execution_frequency', int), ('task_type', object)]) - - self.op_type_dt = np.dtype( - [('kernel_type', object), ('total_time', float), ('execution_frequency', int), ('percent', float)]) - - self.aicpu_detail_dt = np.dtype( - [('serial_number', int), ('kernel_type', object), ('total_time', float), ('dispatch_time', float), - ('execution_time', float), ('run_start', float), ('run_end', float)]) - - self.framwork_raw_dt = np.dtype( - [('task_id', int), ('stream_id', int), ('block_dim', int), ('full_kernel_name', object), - ('op_name', object), ('kernel_name', object), ('kernel_type', object), ('subgraph', object), - ('op_info', object), ('model_id', int), ('task_type', object)]) - - def parse(self): - """ - Analyse op summary op statistic generate op data. - """ - - self._combine_op_and_kernel(self.op_summary, self.launch_ops) - # aicore intermediation detail - self.op_detail = self._parse_op_detail(self.op_summary) - - # aicore intermediation type - self.op_type = self._parse_op_type(self.op_statistic) - - # aicpu_intermediation - self.aicpu_detail = self._parse_aicpu_detail(self.op_summary) - - # framwork_raw - self.framework_raw = self._parse_framework_raw(self.op_summary) - - self.output_timeline_data = self.op_summary[self.op_summary['Task Type'] == 'AI_CORE'][ - ['Op Name', 'Stream ID', 'Task Start Time', 'Task Duration']] - - def write(self, aicore_intermediate_detail_path, aicore_intermediate_type_path, aicpu_intermediate_detail_path, - framework_raw_path, output_timeline_data_path): - """ - Write the op_intermediate_detail.csv op_intermediate_type.csv aicpu_intermediate.csv and framework_raw.csv. - - Args: - aicore_intermediate_detail_path(str): op_intermediate_detail.csv path. - aicore_intermediate_type_path(str): op_intermediate_type.csv path. - aicpu_intermediate_detail_path(str): aicpu_intermediate.csv path. - framework_raw_path: framework_raw.csv path - output_timeline_data_path : output_timeline_data.txt path - """ - # aicore intermediation detail - if self.op_detail.shape[0] != 0: - try: - with os.fdopen(os.open(aicore_intermediate_detail_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), - 'w') as aicore_detail: - writer = csv.writer(aicore_detail) - writer.writerow(self.op_detail.dtype.names) - writer.writerows(self.op_detail.tolist()) - except (IOError, OSError) as err: - logging.critical('Errot occurred when write aicore detail file: %s', err) - raise ProfilerIOException() from err - if os.path.exists(aicore_intermediate_detail_path): - os.chmod(aicore_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) - - # aicore intermediation type - if self.op_type.shape[0] != 0: - try: - with os.fdopen(os.open(aicore_intermediate_type_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), - 'w') as aicore_type: - writer = csv.writer(aicore_type) - writer.writerow(self.op_type.dtype.names) - writer.writerows(self.op_type.tolist()) - except (IOError, OSError) as err: - logging.critical('Errot occurred when write aicore type file: %s', err) - raise ProfilerIOException() from err - if os.path.exists(aicore_intermediate_type_path): - os.chmod(aicore_intermediate_type_path, stat.S_IREAD | stat.S_IWRITE) - - # aicpu_intermediation - if self.aicpu_detail.shape[0] != 0: - try: - with os.fdopen(os.open(aicpu_intermediate_detail_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), - 'w') as aicpu_type: - writer = csv.writer(aicpu_type) - writer.writerow(self.aicpu_detail.dtype.names) - writer.writerows(self.aicpu_detail.tolist()) - except (IOError, OSError) as err: - logging.critical('Errot occurred when write aicpu detail file: %s', err) - raise ProfilerIOException() from err - if os.path.exists(aicpu_intermediate_detail_path): - os.chmod(aicpu_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) - - # framwork_raw - if self.framework_raw.shape[0] != 0: - try: - with os.fdopen(os.open(framework_raw_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), - 'w') as framework: - writer = csv.writer(framework) - writer.writerow(self.framework_raw.dtype.names) - writer.writerows(self.framework_raw.tolist()) - except (IOError, OSError) as err: - logging.critical('Errot occurred when write framework file: %s', err) - raise ProfilerIOException() from err - if os.path.exists(framework_raw_path): - os.chmod(framework_raw_path, stat.S_IREAD | stat.S_IWRITE) - - # output_timeline_data - if self.output_timeline_data.shape[0] != 0 and output_timeline_data_path: - try: - with os.fdopen(os.open(output_timeline_data_path, - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), - 'w') as output_timeline_data: - writer = csv.writer(output_timeline_data) - writer.writerow(['kernel_name', 'stream_id', 'start_time(us)', 'duration(ms)']) - writer.writerows(self.output_timeline_data.tolist()) - except (IOError, OSError) as err: - logging.critical('Error occurred when write output timeline data file: %s', err) - raise ProfilerIOException() from err - if os.path.exists(aicpu_intermediate_detail_path): - os.chmod(aicpu_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) - - def _combine_op_and_kernel(self, op_summary, launch_ops): - """update op name, kernel name etc.""" - self._full_kernel_name = op_summary['Op Name'].copy() - self._op_name = op_summary['Op Name'].copy() - self._kernel_name = np.array( - [x[-1] for x in np.char.split(op_summary['Op Name'].astype(str), sep='/')], dtype=object) - self._sub_graph = np.array( - [x[0] for x in np.char.split(op_summary['Op Name'].astype(str), sep='/')], dtype=object) - - if launch_ops and len(launch_ops) != len(op_summary): - logger.error("Size mismatch between op_summary and launch_ops!") - launch_ops = [] - - for index, launch_op in enumerate(launch_ops): - if not launch_op: - continue - self._op_name[index] = launch_op - self._kernel_name[index] = self._full_kernel_name[index] - self._full_kernel_name[index] = f"{launch_op}/{self._full_kernel_name[index]}" - self._sub_graph[index] = launch_op.split("/")[0] - - def _parse_op_detail(self, op_summary): - """ - Analyse op summary generate op detail data. - - Args: - op_summary(DataFrame): op summary data. - """ - if self.aclnn_status: - op_detail = np.empty((len(op_summary),), dtype=self.op_detail_dt) - op_detail['task_type'] = op_summary['Task Type'] - op_detail['execution_frequency'] = np.ones((len(op_summary),), dtype=int) - op_detail['task_duration'] = op_summary['Task Duration'] - op_detail['full_kernel_name'] = self._full_kernel_name - else: - groups, index, inverse, counts = np.unique(op_summary['Op Name'], return_index=True, - return_inverse=True, return_counts=True) - - op_detail = np.empty((len(groups),), dtype=self.op_detail_dt) - op_detail['full_kernel_name'] = groups - op_detail['task_type'] = op_summary[index]['Task Type'] - nonzero_duration = np.bincount(inverse) != 0 - op_detail['task_duration'] = np.where(nonzero_duration, np.bincount( - inverse, weights=op_summary['Task Duration']) / np.bincount(inverse), 0) - op_detail['execution_frequency'] = counts - - return op_detail - - def _parse_op_type(self, op_statistic): - """ - Analyse op statistic generate op type data. - - Args: - op_statistic(DataFrame): op statistic data. - """ - - groups, _, inverse, _ = np.unique(op_statistic['Op Type'], return_index=True, return_inverse=True, - return_counts=True) - - op_type = np.empty((len(groups),), dtype=self.op_type_dt) - op_type['kernel_type'] = groups - op_type['total_time'] = np.bincount(inverse, weights=op_statistic['Total Time']) - op_type['execution_frequency'] = np.bincount(inverse, weights=op_statistic['Count']) - op_type['percent'] = op_type['total_time'] / np.sum(op_statistic['Total Time']) if np.sum( - op_statistic['Total Time']) != 0 else 0 - - return op_type - - def _parse_aicpu_detail(self, op_summary): - """ - Analyse op summary generate aicpu detail data. - - Args: - op_summary(DataFrame): op summary data. - """ - - op_summary = op_summary[op_summary['Task Type'] == 'AI_CPU'] - - aicpu_detail = np.empty((len(op_summary),), dtype=self.aicpu_detail_dt) - - aicpu_detail['serial_number'] = [i for i in range(1, op_summary.shape[0] + 1)] - aicpu_detail['kernel_type'] = op_summary['Op Type'] - aicpu_detail['total_time'] = op_summary['Task Duration'] + op_summary['Task Wait Time'] - aicpu_detail['dispatch_time'] = op_summary['Task Wait Time'] - aicpu_detail['execution_time'] = op_summary['Task Duration'] - aicpu_detail['run_start'] = op_summary['Task Start Time'] - aicpu_detail['run_end'] = aicpu_detail['run_start'] + aicpu_detail['total_time'] - - return aicpu_detail - - def _parse_framework_raw(self, op_summary): - """ - Analyse op summary generate op framework data. - - Args: - op_summary(DataFrame): op summary data. - """ - - def op_info_analyse(row): - """generate op info data""" - input_shapes = row['Input Shapes'].replace('"', '').split(';') - input_data_types = row['Input Data Types'].replace('_', '').split(';') - input_formats = row['Input Formats'].replace('_', '').split(';') - output_shapes = row['Output Shapes'].replace('"', '').split(';') - output_data_types = row['Output Data Types'].replace('_', '').split(';') - output_formats = row['Output Formats'].replace('_', '').split(';') - op_info = {} - if isinstance(input_shapes, list) and len(input_shapes) >= 1 and input_shapes[0] != '': - input_size = len(input_shapes) - for i in range(input_size): - op_info[f'Input_{i}'] = { - 'format': input_formats[i], - 'data_type': input_data_types[i], - 'shape': input_shapes[i] - } - if isinstance(output_shapes, list) and len(output_shapes) >= 1 and output_shapes[0] != '': - output_size = len(output_shapes) - for i in range(output_size): - op_info[f'Output_{i}'] = { - 'format': output_formats[i], - 'data_type': output_data_types[i], - 'shape': output_shapes[i] - } - return json.dumps(op_info) - - if self.dynamic_status or self.aclnn_status: - index = list(range(op_summary.shape[0])) - else: - _, index, _, _ = np.unique(op_summary['Op Name'], return_index=True, return_inverse=True, - return_counts=True) - framwork_raw = np.empty((len(index),), dtype=self.framwork_raw_dt) - - framwork_raw['task_id'] = op_summary[index]['Task ID'] - framwork_raw['stream_id'] = op_summary[index]['Stream ID'] - framwork_raw['full_kernel_name'] = self._full_kernel_name[index] - framwork_raw['op_name'] = self._op_name[index] - framwork_raw['kernel_name'] = self._kernel_name[index] - framwork_raw['kernel_type'] = op_summary[index]['Op Type'] - framwork_raw['subgraph'] = self._sub_graph[index] - framwork_raw['op_info'] = [op_info_analyse(x) for x in op_summary[index]] - framwork_raw['model_id'] = op_summary[index]['Model ID'] - framwork_raw['task_type'] = op_summary[index]['Task Type'] - - return framwork_raw +# Copyright 2023 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""op analyse model""" +import csv +import json +import logging +import os +import stat +from typing import Optional, List + +import numpy as np +from mindspore import log as logger +from mindspore.profiler.common.exceptions.exceptions import ProfilerIOException + + +class AscendOPGenerator: + """Generate ascend op data from DataFrame.""" + + def __init__( + self, op_summary: np.ndarray, op_statistic: np.ndarray, + dynamic_status: bool = False, launch_ops: Optional[List] = None + ): + self.op_summary = op_summary + self.op_statistic = op_statistic + self.dynamic_status = dynamic_status + self.op_detail = None + self.op_type = None + self.aicpu_detail = None + self.framework_raw = None + self.output_timeline_data = None + self.launch_ops = launch_ops if launch_ops else [] + self.aclnn_status = bool(launch_ops) + self._full_kernel_name = None + self._sub_graph = None + self._op_name = None + self._kernel_name = None + + self.op_detail_dt = np.dtype( + [('full_kernel_name', object), ('task_duration', float), + ('execution_frequency', int), ('task_type', object)]) + + self.op_type_dt = np.dtype( + [('kernel_type', object), ('total_time', float), ('execution_frequency', int), ('percent', float)]) + + self.aicpu_detail_dt = np.dtype( + [('serial_number', int), ('kernel_type', object), ('total_time', float), ('dispatch_time', float), + ('execution_time', float), ('run_start', float), ('run_end', float)]) + + self.framwork_raw_dt = np.dtype( + [('task_id', int), ('stream_id', int), ('block_dim', int), ('full_kernel_name', object), + ('op_name', object), ('kernel_name', object), ('kernel_type', object), ('subgraph', object), + ('op_info', object), ('model_id', int), ('task_type', object)]) + + def parse(self): + """ + Analyse op summary op statistic generate op data. + """ + + self._combine_op_and_kernel(self.op_summary, self.launch_ops) + # aicore intermediation detail + self.op_detail = self._parse_op_detail(self.op_summary) + + # aicore intermediation type + self.op_type = self._parse_op_type(self.op_statistic) + + # aicpu_intermediation + self.aicpu_detail = self._parse_aicpu_detail(self.op_summary) + + # framwork_raw + self.framework_raw = self._parse_framework_raw(self.op_summary) + + self.output_timeline_data = self.op_summary[self.op_summary['Task Type'] == 'AI_CORE'][ + ['Op Name', 'Stream ID', 'Task Start Time', 'Task Duration']] + + def write(self, aicore_intermediate_detail_path, aicore_intermediate_type_path, aicpu_intermediate_detail_path, + framework_raw_path, output_timeline_data_path): + """ + Write the op_intermediate_detail.csv op_intermediate_type.csv aicpu_intermediate.csv and framework_raw.csv. + + Args: + aicore_intermediate_detail_path(str): op_intermediate_detail.csv path. + aicore_intermediate_type_path(str): op_intermediate_type.csv path. + aicpu_intermediate_detail_path(str): aicpu_intermediate.csv path. + framework_raw_path: framework_raw.csv path + output_timeline_data_path : output_timeline_data.txt path + """ + # aicore intermediation detail + if isinstance(self.op_detail, np.ndarray) and self.op_detail.shape[0] != 0: + try: + with os.fdopen(os.open(aicore_intermediate_detail_path, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), + 'w') as aicore_detail: + writer = csv.writer(aicore_detail) + writer.writerow(self.op_detail.dtype.names) + writer.writerows(self.op_detail.tolist()) + except (IOError, OSError) as err: + logging.critical('Errot occurred when write aicore detail file: %s', err) + raise ProfilerIOException() from err + if os.path.exists(aicore_intermediate_detail_path): + os.chmod(aicore_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) + + # aicore intermediation type + if isinstance(self.op_type, np.ndarray) and self.op_type.shape[0] != 0: + try: + with os.fdopen(os.open(aicore_intermediate_type_path, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), + 'w') as aicore_type: + writer = csv.writer(aicore_type) + writer.writerow(self.op_type.dtype.names) + writer.writerows(self.op_type.tolist()) + except (IOError, OSError) as err: + logging.critical('Errot occurred when write aicore type file: %s', err) + raise ProfilerIOException() from err + if os.path.exists(aicore_intermediate_type_path): + os.chmod(aicore_intermediate_type_path, stat.S_IREAD | stat.S_IWRITE) + + # aicpu_intermediation + if isinstance(self.aicpu_detail, np.ndarray) and self.aicpu_detail.shape[0] != 0: + try: + with os.fdopen(os.open(aicpu_intermediate_detail_path, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), + 'w') as aicpu_type: + writer = csv.writer(aicpu_type) + writer.writerow(self.aicpu_detail.dtype.names) + writer.writerows(self.aicpu_detail.tolist()) + except (IOError, OSError) as err: + logging.critical('Errot occurred when write aicpu detail file: %s', err) + raise ProfilerIOException() from err + if os.path.exists(aicpu_intermediate_detail_path): + os.chmod(aicpu_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) + + # framwork_raw + if isinstance(self.framework_raw, np.ndarray) and self.framework_raw.shape[0] != 0: + try: + with os.fdopen(os.open(framework_raw_path, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), + 'w') as framework: + writer = csv.writer(framework) + writer.writerow(self.framework_raw.dtype.names) + writer.writerows(self.framework_raw.tolist()) + except (IOError, OSError) as err: + logging.critical('Errot occurred when write framework file: %s', err) + raise ProfilerIOException() from err + if os.path.exists(framework_raw_path): + os.chmod(framework_raw_path, stat.S_IREAD | stat.S_IWRITE) + + # output_timeline_data + if isinstance(self.output_timeline_data, np.ndarray) and self.output_timeline_data.size and \ + self.output_timeline_data.shape[0] != 0 and output_timeline_data_path: + try: + with os.fdopen(os.open(output_timeline_data_path, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), + 'w') as output_timeline_data: + writer = csv.writer(output_timeline_data) + writer.writerow(['kernel_name', 'stream_id', 'start_time(us)', 'duration(ms)']) + writer.writerows(self.output_timeline_data.tolist()) + except (IOError, OSError) as err: + logging.critical('Error occurred when write output timeline data file: %s', err) + raise ProfilerIOException() from err + if os.path.exists(aicpu_intermediate_detail_path): + os.chmod(aicpu_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) + + def _combine_op_and_kernel(self, op_summary, launch_ops): + """update op name, kernel name etc.""" + self._full_kernel_name = op_summary['Op Name'].copy() + self._op_name = op_summary['Op Name'].copy() + self._kernel_name = np.array( + [x[-1] for x in np.char.split(op_summary['Op Name'].astype(str), sep='/')], dtype=object) + self._sub_graph = np.array( + [x[0] for x in np.char.split(op_summary['Op Name'].astype(str), sep='/')], dtype=object) + + if launch_ops and len(launch_ops) != len(op_summary): + logger.error("Size mismatch between op_summary and launch_ops!") + launch_ops = [] + + for index, launch_op in enumerate(launch_ops): + if not launch_op: + continue + self._op_name[index] = launch_op + self._kernel_name[index] = self._full_kernel_name[index] + self._full_kernel_name[index] = f"{launch_op}/{self._full_kernel_name[index]}" + self._sub_graph[index] = launch_op.split("/")[0] + + def _parse_op_detail(self, op_summary): + """ + Analyse op summary generate op detail data. + + Args: + op_summary(DataFrame): op summary data. + """ + if self.aclnn_status: + op_detail = np.empty((len(op_summary),), dtype=self.op_detail_dt) + op_detail['task_type'] = op_summary['Task Type'] + op_detail['execution_frequency'] = np.ones((len(op_summary),), dtype=int) + op_detail['task_duration'] = op_summary['Task Duration'] + op_detail['full_kernel_name'] = self._full_kernel_name + else: + groups, index, inverse, counts = np.unique(op_summary['Op Name'], return_index=True, + return_inverse=True, return_counts=True) + + op_detail = np.empty((len(groups),), dtype=self.op_detail_dt) + op_detail['full_kernel_name'] = groups + op_detail['task_type'] = op_summary[index]['Task Type'] + nonzero_duration = np.bincount(inverse) != 0 + op_detail['task_duration'] = np.where(nonzero_duration, np.bincount( + inverse, weights=op_summary['Task Duration']) / np.bincount(inverse), 0) + op_detail['execution_frequency'] = counts + + return op_detail + + def _parse_op_type(self, op_statistic): + """ + Analyse op statistic generate op type data. + + Args: + op_statistic(DataFrame): op statistic data. + """ + + if isinstance(op_statistic, np.ndarray) and not op_statistic.size or not isinstance(op_statistic, np.ndarray) \ + and not op_statistic: + return None + groups, _, inverse, _ = np.unique(op_statistic['Op Type'], return_index=True, return_inverse=True, + return_counts=True) + + op_type = np.empty((len(groups),), dtype=self.op_type_dt) + op_type['kernel_type'] = groups + op_type['total_time'] = np.bincount(inverse, weights=op_statistic['Total Time']) + op_type['execution_frequency'] = np.bincount(inverse, weights=op_statistic['Count']) + op_type['percent'] = op_type['total_time'] / np.sum(op_statistic['Total Time']) if np.sum( + op_statistic['Total Time']) != 0 else 0 + + return op_type + + def _parse_aicpu_detail(self, op_summary): + """ + Analyse op summary generate aicpu detail data. + + Args: + op_summary(DataFrame): op summary data. + """ + + op_summary = op_summary[op_summary['Task Type'] == 'AI_CPU'] + + aicpu_detail = np.empty((len(op_summary),), dtype=self.aicpu_detail_dt) + + aicpu_detail['serial_number'] = [i for i in range(1, op_summary.shape[0] + 1)] + aicpu_detail['kernel_type'] = op_summary['Op Type'] + aicpu_detail['total_time'] = op_summary['Task Duration'] + op_summary['Task Wait Time'] + aicpu_detail['dispatch_time'] = op_summary['Task Wait Time'] + aicpu_detail['execution_time'] = op_summary['Task Duration'] + aicpu_detail['run_start'] = op_summary['Task Start Time'] + aicpu_detail['run_end'] = aicpu_detail['run_start'] + aicpu_detail['total_time'] + + return aicpu_detail + + def _parse_framework_raw(self, op_summary): + """ + Analyse op summary generate op framework data. + + Args: + op_summary(DataFrame): op summary data. + """ + + def op_info_analyse(row): + """generate op info data""" + input_shapes = row['Input Shapes'].replace('"', '').split(';') + input_data_types = row['Input Data Types'].replace('_', '').split(';') + input_formats = row['Input Formats'].replace('_', '').split(';') + output_shapes = row['Output Shapes'].replace('"', '').split(';') + output_data_types = row['Output Data Types'].replace('_', '').split(';') + output_formats = row['Output Formats'].replace('_', '').split(';') + op_info = {} + if isinstance(input_shapes, list) and len(input_shapes) >= 1 and input_shapes[0] != '': + input_size = len(input_shapes) + for i in range(input_size): + op_info[f'Input_{i}'] = { + 'format': input_formats[i], + 'data_type': input_data_types[i], + 'shape': input_shapes[i] + } + if isinstance(output_shapes, list) and len(output_shapes) >= 1 and output_shapes[0] != '': + output_size = len(output_shapes) + for i in range(output_size): + op_info[f'Output_{i}'] = { + 'format': output_formats[i], + 'data_type': output_data_types[i], + 'shape': output_shapes[i] + } + return json.dumps(op_info) + + if self.dynamic_status or self.aclnn_status: + index = list(range(op_summary.shape[0])) + else: + _, index, _, _ = np.unique(op_summary['Op Name'], return_index=True, return_inverse=True, + return_counts=True) + framwork_raw = np.empty((len(index),), dtype=self.framwork_raw_dt) + + framwork_raw['task_id'] = op_summary[index]['Task ID'] + framwork_raw['stream_id'] = op_summary[index]['Stream ID'] + framwork_raw['full_kernel_name'] = self._full_kernel_name[index] + framwork_raw['op_name'] = self._op_name[index] + framwork_raw['kernel_name'] = self._kernel_name[index] + framwork_raw['kernel_type'] = op_summary[index]['Op Type'] + framwork_raw['subgraph'] = self._sub_graph[index] + framwork_raw['op_info'] = [op_info_analyse(x) for x in op_summary[index]] + framwork_raw['model_id'] = op_summary[index]['Model ID'] + framwork_raw['task_type'] = op_summary[index]['Task Type'] + + return framwork_raw \ No newline at end of file -- Gitee