diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc index 06e675513918502b697596b7d9b84c2d7fc9cdb9..06fc193e78c8549ec22bec4cbf53afa7b11cec8a 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc +++ b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.cc @@ -160,7 +160,6 @@ uint64_t Utils::GetPid() { template void RingBuffer::Init(size_t capacity) { capacity_ = capacity; - mask_ = capacity_ - 1; data_queue_.resize(capacity); is_inited_ = true; is_quit_ = false; @@ -174,7 +173,6 @@ void RingBuffer::UnInit() { write_index_ = 0; idle_write_index_ = 0; capacity_ = 0; - mask_ = 0; is_quit_ = true; is_inited_ = false; } @@ -182,37 +180,32 @@ void RingBuffer::UnInit() { template size_t RingBuffer::Size() { - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - size_t curr_write_index = write_index_.load(std::memory_order_relaxed); - if (curr_read_index > curr_write_index) { - return capacity_ - (curr_read_index & mask_) + (curr_write_index & mask_); + size_t curr_read_index = read_index_.load(std::memory_order_acquire); + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_read_index >= curr_write_index) { + return 0; } return curr_write_index - curr_read_index; } +template +bool RingBuffer::Full() { + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_write_index >= capacity_) { + return true; + } else { + return false; + } +} + template bool RingBuffer::Push(T data) { size_t curr_write_index = 0; - size_t next_write_index = 0; - size_t cycles = 0; - do { - if (!is_inited_ || is_quit_) { - return false; - } - cycles++; - if (cycles >= 1024) { - return false; - } - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - curr_write_index = idle_write_index_.load(std::memory_order_relaxed); - next_write_index = curr_write_index + 1; - if ((next_write_index & mask_) == (curr_read_index & mask_)) { - return false; - } - } while (!idle_write_index_.compare_exchange_weak(curr_write_index, next_write_index)); - size_t index = curr_write_index & mask_; - data_queue_[index] = std::move(data); - write_index_++; + curr_write_index = write_index_.fetch_add(1, std::memory_order_acquire); + if (curr_write_index >= capacity_) { + return false; + } + data_queue_[curr_write_index] = std::move(data); return true; } @@ -221,23 +214,27 @@ T RingBuffer::Pop() { if (!is_inited_) { return nullptr; } - size_t curr_read_index = read_index_.load(std::memory_order_relaxed); - size_t curr_write_index = write_index_.load(std::memory_order_relaxed); - if ((curr_read_index & mask_) == (curr_write_index & mask_) && !is_quit_) { + size_t curr_read_index = read_index_.fetch_add(1, std::memory_order_acquire); + size_t curr_write_index = write_index_.load(std::memory_order_acquire); + if (curr_read_index >= curr_write_index || curr_read_index >= capacity_) { return nullptr; } - size_t index = curr_read_index & mask_; - T data = std::move(data_queue_[index]); - read_index_++; + T data = std::move(data_queue_[curr_read_index]); return data; } +template +void RingBuffer::Reset() { + write_index_ = 0; + read_index_ = 0; +} + ProfilingDataDumper::ProfilingDataDumper() : path_(""), start_(false), init_(false) {} ProfilingDataDumper::~ProfilingDataDumper() { UnInit(); } void ProfilingDataDumper::Init(const std::string &path, size_t capacity) { - MS_LOG(INFO) << "init profiling data dumper."; + MS_LOG(INFO) << "init profiling data dumper, capacity: " << capacity; path_ = path; data_chunk_buf_.Init(capacity); init_.store(true); @@ -263,10 +260,6 @@ void ProfilingDataDumper::Start() { if (!init_.load() || !Utils::CreateDir(path_)) { return; } - if (Thread::Start() != 0) { - MS_LOG(ERROR) << "profiling data dumper thread start failed."; - return; - } start_.store(true); } @@ -274,7 +267,6 @@ void ProfilingDataDumper::Stop() { MS_LOG(INFO) << "stop profiling data dumper."; if (start_.load() == true) { start_.store(false); - Thread::Stop(); } Flush(); } @@ -302,30 +294,34 @@ void ProfilingDataDumper::GatherAndDumpData() { } } -void ProfilingDataDumper::Run() { - for (;;) { - if (!start_.load()) { - break; - } - if (data_chunk_buf_.Size() > kNotifyInterval) { - GatherAndDumpData(); - } else { - usleep(kMaxWaitTimeUs); - } - } -} - void ProfilingDataDumper::Flush() { - while (data_chunk_buf_.Size() != 0) { + MS_LOG(INFO) << "data_chunk_buf_.Size: " << data_chunk_buf_.Size(); + while (data_chunk_buf_.Size() > 0) { GatherAndDumpData(); } + data_chunk_buf_.Reset(); } void ProfilingDataDumper::Report(std::unique_ptr data) { if (!start_.load() || data == nullptr) { return; } - data_chunk_buf_.Push(std::move(data)); + int i = 0; + while (is_flush_.load() && i < 10) { + usleep(kMaxWaitTimeUs); + i++; + } + if (!data_chunk_buf_.Push(std::move(data))) { + is_flush_.store(true); + std::lock_guard flush_lock_(flush_mutex_); + if (data_chunk_buf_.Full()) { + Flush(); + } + is_flush_.store(false); + if (!data_chunk_buf_.Push(std::move(data))) { + MS_LOG(ERROR) << "profiling data Report failed."; + } + } } void ProfilingDataDumper::Dump(const std::map> &dataMap) { diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h index 3d93e34b4f8f088a1aae43bb2064afab0d7790ef..5b9bf6a636ec182287177c25fdbe5d3c0699d6c7 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h +++ b/mindspore/ccsrc/plugin/device/ascend/hal/profiler/profiling_data_dumper.h @@ -36,10 +36,10 @@ namespace mindspore { namespace profiler { namespace ascend { -constexpr uint32_t kDefaultRingBuffer = 1024; +constexpr uint32_t kDefaultRingBuffer = 1000 * 1000; constexpr uint32_t kBatchMaxLen = 5 * 1024 * 1024; // 5 MB -constexpr uint32_t kMaxWaitTimeUs = 1000 * 1000; -constexpr uint32_t kNotifyInterval = 1000; +constexpr uint32_t kMaxWaitTimeUs = 100 * 1000; +constexpr uint32_t kMaxWaitTimes = 10; class Utils { public: @@ -75,6 +75,8 @@ class RingBuffer { size_t Size(); bool Push(T data); T Pop(); + bool Full(); + void Reset(); private: bool is_inited_; @@ -87,53 +89,6 @@ class RingBuffer { std::vector data_queue_; }; -class Thread { - public: - Thread() : is_alive_(false), pid_(0), thread_name_("NPUProfiler") {} - - ~Thread() { - if (is_alive_) { - (void)pthread_cancel(pid_); - (void)pthread_join(pid_, nullptr); - } - } - - void SetThreadName(const std::string &name) { - if (!name.empty()) { - thread_name_ = name; - } - } - - std::string GetThreadName() { return thread_name_; } - - int Start() { - int ret = pthread_create(&pid_, nullptr, Execute, reinterpret_cast(this)); - is_alive_ = (ret == 0) ? true : false; - return ret; - } - - int Stop() { return Join(); } - - int Join() { - int ret = pthread_join(pid_, nullptr); - is_alive_ = (ret == 0) ? false : true; - return ret; - } - - private: - static void *Execute(void *args) { - Thread *thr = reinterpret_cast(args); - thr->Run(); - return nullptr; - } - virtual void Run() = 0; - - private: - bool is_alive_; - pthread_t pid_; - std::string thread_name_; -}; - struct BaseReportData { int32_t device_id{0}; std::string tag; @@ -142,7 +97,7 @@ struct BaseReportData { virtual std::vector encode() = 0; }; -class ProfilingDataDumper : public Thread { +class ProfilingDataDumper { public: ProfilingDataDumper(); virtual ~ProfilingDataDumper(); @@ -151,6 +106,7 @@ class ProfilingDataDumper : public Thread { void Report(std::unique_ptr data); void Start(); void Stop(); + void Flush(); static std::shared_ptr &GetInstance() { static std::shared_ptr instance = std::make_shared(); @@ -158,7 +114,6 @@ class ProfilingDataDumper : public Thread { } private: - void Flush(); void Dump(const std::map> &dataMap); void Run(); void GatherAndDumpData(); @@ -167,8 +122,10 @@ class ProfilingDataDumper : public Thread { std::string path_; std::atomic start_; std::atomic init_; + std::atomic is_flush_{false}; RingBuffer> data_chunk_buf_; std::map fd_map_; + std::mutex flush_mutex_; }; } // namespace ascend diff --git a/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py b/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py index 6c93bb90707a1159f286b951f62c5886df15a038..61075e173c343c724f58a473f2c1e9c0c6e76c56 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_msprof_exporter.py @@ -253,7 +253,7 @@ class AscendMsprofExporter: if not op_summary: raise RuntimeError("The op_summary csv file was not found, perhaps the original data was not collected.") if not op_statistic: - raise RuntimeError("The op_statistics csv file was not found, perhaps the original data was not collected.") + logger.warning("The op_statistics csv file was not found, perhaps the original data was not collected.") if not msprof_json: raise RuntimeError("The msprof json file was not found, perhaps the original data was not collected.") diff --git a/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py b/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py index 813e96c0f5ee3f221124f7e1ebc4459d03d18634..a2b316b2700e6d29628809fa73e25848be1cc9ee 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_msprof_generator.py @@ -129,7 +129,10 @@ class AscendMsprofDataGenerator: """read op statistic to memory""" op_statistic = [] op_statistic_name = fr'{self.mindstudio_profiler_output}/op_statistic_*.csv' - op_statistic_file = get_newest_file(glob.glob(op_statistic_name))[0] + op_statistic_files = glob.glob(op_statistic_name) + if not op_statistic_files: + return + op_statistic_file = get_newest_file(op_statistic_files)[0] with open(op_statistic_file, newline='') as csvfile: reader = csv.DictReader(csvfile, delimiter=',', quotechar='"') for row in reader: @@ -140,7 +143,8 @@ class AscendMsprofDataGenerator: ) new_row = tuple(['0' if d == 'N/A' else d for d in new_row]) op_statistic.append(new_row) - + if not op_statistic: + return op_statistic_dt = np.dtype(self.op_statistic_type) self.op_statistic = np.array(op_statistic, dtype=op_statistic_dt) self.op_statistic['Total Time'] *= 1e-3 diff --git a/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py b/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py index 5fc72480e751f5a2ed8873e94d89e573683c0659..7f3d0b1f7234af267c0d2fdefa149f323f5d9345 100644 --- a/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py +++ b/mindspore/python/mindspore/profiler/parser/ascend_op_generator.py @@ -97,7 +97,7 @@ class AscendOPGenerator: output_timeline_data_path : output_timeline_data.txt path """ # aicore intermediation detail - if self.op_detail.shape[0] != 0: + if isinstance(self.op_detail, np.ndarray) and self.op_detail.shape[0] != 0: try: with os.fdopen(os.open(aicore_intermediate_detail_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), @@ -112,7 +112,7 @@ class AscendOPGenerator: os.chmod(aicore_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) # aicore intermediation type - if self.op_type.shape[0] != 0: + if isinstance(self.op_type, np.ndarray) and self.op_type.shape[0] != 0: try: with os.fdopen(os.open(aicore_intermediate_type_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), @@ -127,7 +127,7 @@ class AscendOPGenerator: os.chmod(aicore_intermediate_type_path, stat.S_IREAD | stat.S_IWRITE) # aicpu_intermediation - if self.aicpu_detail.shape[0] != 0: + if isinstance(self.aicpu_detail, np.ndarray) and self.aicpu_detail.shape[0] != 0: try: with os.fdopen(os.open(aicpu_intermediate_detail_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), @@ -142,7 +142,7 @@ class AscendOPGenerator: os.chmod(aicpu_intermediate_detail_path, stat.S_IREAD | stat.S_IWRITE) # framwork_raw - if self.framework_raw.shape[0] != 0: + if isinstance(self.framework_raw, np.ndarray) and self.framework_raw.shape[0] != 0: try: with os.fdopen(os.open(framework_raw_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR), @@ -157,7 +157,8 @@ class AscendOPGenerator: os.chmod(framework_raw_path, stat.S_IREAD | stat.S_IWRITE) # output_timeline_data - if self.output_timeline_data.shape[0] != 0 and output_timeline_data_path: + if isinstance(self.output_timeline_data, np.ndarray) and self.output_timeline_data.size and \ + self.output_timeline_data.shape[0] != 0 and output_timeline_data_path: try: with os.fdopen(os.open(output_timeline_data_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IWUSR | stat.S_IRUSR),