Motiv
Marvelous OTF2 Traces Interactive Visualizer
Loading...
Searching...
No Matches
ReaderCallbacks.cpp
1/*
2 * Marvelous OTF2 Traces Interactive Visualizer (MOTIV)
3 * Copyright (C) 2023 Florian Gallrein, Björn Gehrke
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18#include "ReaderCallbacks.hpp"
19#include "src/models/communication/Communication.hpp"
20#include "src/models/Slot.hpp"
21#include "lib/otf2xx/include/otf2xx/otf2.hpp"
22#include "src/models/communication/BlockingSendEvent.hpp"
23#include "src/models/communication/BlockingReceivEevent.hpp"
24#include "src/models/communication/NonBlockingSendEvent.hpp"
25#include <QStringListModel>
26#include <memory>
27#include <utility>
28#include <type_traits>
29
30ReaderCallbacks::ReaderCallbacks(otf2::reader::reader &rdr) :
31 slots_(std::vector<Slot*>()),
32 communications_(std::vector<Communication*>()),
33 collectiveCommunications_(std::vector<CollectiveCommunicationEvent*>()),
34 slotsBuilding(),
35 program_start_(),
36 rdr_(rdr) {
37
38}
39
40
41std::vector<Slot*> ReaderCallbacks::getSlots() {
42 return this->slots_;
43}
44
45otf2::chrono::duration ReaderCallbacks::duration() const {
46 return this->program_end_ - this->program_start_;
47}
48
49void ReaderCallbacks::definition(const otf2::definition::location &loc) {
50 rdr_.register_location(loc);
51}
52
53void ReaderCallbacks::event(const otf2::definition::location &, const otf2::event::program_begin &event) {
54 this->program_start_ = event.timestamp();
55}
56
57void ReaderCallbacks::event(const otf2::definition::location &, const otf2::event::program_end &event) {
58 this->program_end_ = event.timestamp();
59}
60
61
62void ReaderCallbacks::event(const otf2::definition::location &loc, const otf2::event::enter &event) {
63 auto start = event.timestamp() - this->program_start_;
64
65 Slot::Builder builder{};
66 auto region = new otf2::definition::region(event.region());
67 auto location = new otf2::definition::location(loc);
68 builder.start(start)->location(location)->region(region);
69
70 std::vector<Slot::Builder> *builders;
71 auto buildersIt = this->slotsBuilding.find(location->ref().get());
72 if (buildersIt == this->slotsBuilding.end()) {
73 builders = new std::vector<Slot::Builder>();
74 this->slotsBuilding.insert({location->ref().get(), builders});
75 } else {
76 builders = buildersIt->second;
77 }
78
79 builders->push_back(builder);
80}
81
82void ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::leave &event) {
83 auto builders = this->slotsBuilding.at(location.ref().get());
84
85 Slot::Builder &builder = builders->back();
86
87 auto end = event.timestamp() - this->program_start_;
88 builder.end(end);
89
90 this->slots_.push_back(new Slot(builder.build()));
91
92 builders->pop_back();
93}
94
95
96template<typename T>
97void ReaderCallbacks::communicationEvent(T* self, uint32_t matching,
98 std::map<uint32_t, std::vector<CommunicationEvent*> *> &selfPending,
99 std::map<uint32_t, std::vector<CommunicationEvent*> *> &matchingPending
100) {
101 // Check for a pending matching call
102 if (matchingPending.contains(matching)) {
103 auto& matchingEvents = matchingPending[matching];
104 auto matchingEvent = matchingEvents->back();
105
106 auto communication = new Communication(matchingEvent, self);
107 communications_.push_back(communication);
108
109 matchingEvents->pop_back();
110 if (matchingEvents->empty()) {
111 delete matchingEvents;
112 matchingPending.erase(matching);
113 }
114 } else {
115 std::vector<CommunicationEvent*> *matchingEvents;
116 auto id = self->getLocation()->ref().get();
117 if (selfPending.contains(id)) {
118 matchingEvents = selfPending[id];
119 } else {
120 matchingEvents = new std::vector<CommunicationEvent*>();
121 selfPending[id] = matchingEvents;
122 }
123
124 matchingEvents->push_back(self);
125 }
126}
127
128void ReaderCallbacks::event(const otf2::definition::location &loc, const otf2::event::mpi_send &send) {
129 auto location = new otf2::definition::location(loc);
130 auto comm = new types::communicator(send.comm());
131 auto ev = new BlockingSendEvent(relative(send.timestamp()), location, comm);
132
133 this->communicationEvent<BlockingSendEvent>(ev, send.receiver(), pendingSends, pendingReceives);
134}
135
136void ReaderCallbacks::event(const otf2::definition::location &loc, const otf2::event::mpi_receive &receive) {
137 auto location = new otf2::definition::location(loc);
138 auto comm = new types::communicator(receive.comm());
139 auto ev = new BlockingReceiveEvent(relative(receive.timestamp()), location, comm);
140
141 this->communicationEvent(ev, receive.sender(), pendingReceives, pendingSends);
142}
143
144void ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::mpi_isend_request &request) {
145 NonBlockingSendEvent::Builder builder;
146 auto comm = new types::communicator (request.comm());
147 auto loc = new otf2::definition::location(location);
148 auto start = relative(request.timestamp());
149 auto receiver = request.receiver();
150 builder.communicator(comm);
151 builder.location(loc);
152 builder.start(start);
153 builder.receiver(receiver);
154
155 this->uncompletedRequests.insert({request.request_id(), builder});
156}
157
158void
159ReaderCallbacks::event(const otf2::definition::location &, const otf2::event::mpi_isend_complete &complete) {
160 if (!uncompletedRequests.contains(complete.request_id())) {
161 throw std::logic_error("Found a mpi_isend_complete event with no matching mpi_isend_request event!");
162 }
163
164 auto builderVariant = uncompletedRequests[complete.request_id()];
165 if(!holds_alternative<NonBlockingSendEvent::Builder>(builderVariant)) {
166 throw std::logic_error("mpi_isend_complete event completes an mpi_ireceive event!");
167 }
168
169 auto builder = get<NonBlockingSendEvent::Builder>(builderVariant);
170
171 auto end = relative(complete.timestamp());
172 builder.end(end);
173
174 auto ev = new NonBlockingSendEvent(builder.build());
175
176 communicationEvent(ev, builder.receiver(), pendingSends, pendingReceives);
177}
178
179void
180ReaderCallbacks::event(const otf2::definition::location &, const otf2::event::mpi_ireceive_complete &complete) {
181 if (!uncompletedRequests.contains(complete.request_id())) {
182 throw std::logic_error("Found a mpi_ireceive_complete event with no matching mpi_ireceive_request event!");
183 }
184
185 auto builderVariant = uncompletedRequests[complete.request_id()];
186 if(!holds_alternative<NonBlockingReceiveEvent::Builder>(builderVariant)) {
187 throw std::logic_error("mpi_ireceive_complete event completes an mpi_isend event!");
188 }
189
190 auto builder = get<NonBlockingReceiveEvent::Builder>(builderVariant);
191
192 auto end = relative(complete.timestamp());
193 builder.end(end);
194
195 auto ev = new NonBlockingReceiveEvent(builder.build());
196
197 communicationEvent(ev, builder.sender(), pendingReceives, pendingSends);
198}
199
200void
201ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::mpi_ireceive_request &request) {
202
203 NonBlockingReceiveEvent::Builder builder;
204 auto comm = new types::communicator (request.comm());
205 auto loc = new otf2::definition::location(location);
206 auto start = relative(request.timestamp());
207 auto sender = request.sender();
208 builder.communicator(comm);
209 builder.location(loc);
210 builder.start(start);
211 builder.sender(sender);
212
213 this->uncompletedRequests.insert({request.request_id(), builder});
214}
215
216void ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::mpi_request_test &test) {
217 callback::event(location, test);
218}
219
220void ReaderCallbacks::event(const otf2::definition::location &location,
221 const otf2::event::mpi_request_cancelled &cancelled) {
222 callback::event(location, cancelled);
223}
224
225void
226ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::mpi_collective_begin &begin) {
227 CollectiveCommunicationEvent::Member::Builder builder;
228 auto loc = new otf2::definition::location(location);
229 auto start = relative(begin.timestamp());
230
231 builder.location(loc);
232 builder.start(start);
233
234 this->ongoingCollectiveCommunicationMembers.insert({loc->ref().get(), builder});
235}
236
237void ReaderCallbacks::event(const otf2::definition::location &location, const otf2::event::mpi_collective_end &anEnd) {
238 if(ongoingCollectiveCommunication == nullptr) {
239 ongoingCollectiveCommunication = new CollectiveCommunicationEvent::Builder();
240 std::vector<CollectiveCommunicationEvent::Member*> members;
241 auto loc = new otf2::definition::location( location);
242 auto comm = new types::communicator (anEnd.comm());
243 auto operation = anEnd.type();
244 auto root = anEnd.root();
245 ongoingCollectiveCommunication->members(members);
246 ongoingCollectiveCommunication->location(loc);
247 ongoingCollectiveCommunication->communicator(comm);
248 ongoingCollectiveCommunication->operation(operation);
249 ongoingCollectiveCommunication->root(root);
250 }
251
252 auto member = ongoingCollectiveCommunicationMembers[location.ref().get()];
253 auto end = relative(anEnd.timestamp());
254 member.end(end);
255
256 ongoingCollectiveCommunication->members()->push_back(new CollectiveCommunicationEvent::Member(member.build()));
257 ongoingCollectiveCommunicationMembers.erase(location.ref().get());
258
259 // If the map is now empty, all ranks have completed the collective operation and the communication event can be build
260 if(ongoingCollectiveCommunicationMembers.empty()){
261 auto event = new CollectiveCommunicationEvent(ongoingCollectiveCommunication->build());
262 collectiveCommunications_.push_back(event);
263 delete ongoingCollectiveCommunication;
264 ongoingCollectiveCommunication = nullptr;
265 }
266}
267
268
269void ReaderCallbacks::events_done(const otf2::reader::reader &) {
270 std::sort(this->slots_.begin(), this->slots_.end(), [](Slot *rhs, Slot *lhs) {
271 return rhs->startTime < lhs->startTime;
272 });
273 std::sort(this->communications_.begin(), this->communications_.end(),
274 [](Communication *rhs, Communication *lhs) {
275 return rhs->getStartEvent()->getStartTime() < lhs->getStartEvent()->getStartTime();
276 });
277
278 for (const auto &item: this->slotsBuilding) {
279 // TODO: Warn about uncomplete slots
280 delete item.second;
281 }
282 for (const auto &item: this->pendingSends) {
283 // TODO: Warn about unmatched sends
284 delete item.second;
285 }
286 for (const auto &item: this->pendingReceives) {
287 // TODO: Warn about unmatched receives
288 delete item.second;
289 }
290// for(const auto &item: this->uncompletedRequests) {
291// // TODO: Warn about uncompleted (and not cancelled) requests
292// }
293
294 std::destroy(this->slotsBuilding.begin(), this->slotsBuilding.end());
295 std::destroy(this->pendingSends.begin(), this->pendingSends.end());
296 std::destroy(this->pendingReceives.begin(), this->pendingReceives.end());
297 std::destroy(this->uncompletedRequests.begin(), this->uncompletedRequests.end());
298}
299
300otf2::chrono::duration ReaderCallbacks::relative(otf2::chrono::time_point timepoint) const {
301 return timepoint - program_start_;
302}
303
304std::vector<Communication*> ReaderCallbacks::getCommunications() {
305 return communications_;
306}
307
308std::vector<CollectiveCommunicationEvent*> ReaderCallbacks::getCollectiveCommunications() {
309 return collectiveCommunications_;
310}
Class representing the blocking send event.
Class representing the blocking send event.
A class representing a member of a collective operation.
A class representing an MPI collective operation.
Class representing any (successful or unsuccessful) communication.
Class representing the non blocking receive event.
Class representing the non blocking send event.
std::vector< Slot * > getSlots()
Returns all read slots.
std::vector< Communication * > getCommunications()
Returns all read point to point communications.
ReaderCallbacks(otf2::reader::reader &rdr)
Creates a new instance of the ReaderCallbacks class.
otf2::chrono::duration duration() const
std::vector< CollectiveCommunicationEvent * > getCollectiveCommunications()
Returns all read collective communications.
A Slot represents a visual slot to be rendered in the UI. It contains the information of a location.
Definition: Slot.hpp:37