18#include "ReaderCallbacks.hpp"
19#include "src/models/communication/Communication.hpp"
20#include "src/models/Slot.hpp"
21#include "lib/otf2xx/include/otf2xx/otf2.hpp"
22#include "src/models/communication/BlockingSendEvent.hpp"
23#include "src/models/communication/BlockingReceivEevent.hpp"
24#include "src/models/communication/NonBlockingSendEvent.hpp"
25#include <QStringListModel>
31 slots_(std::vector<
Slot*>()),
46 return this->program_end_ - this->program_start_;
49void ReaderCallbacks::definition(
const otf2::definition::location &loc) {
50 rdr_.register_location(loc);
53void ReaderCallbacks::event(
const otf2::definition::location &,
const otf2::event::program_begin &event) {
54 this->program_start_ =
event.timestamp();
57void ReaderCallbacks::event(
const otf2::definition::location &,
const otf2::event::program_end &event) {
58 this->program_end_ =
event.timestamp();
62void ReaderCallbacks::event(
const otf2::definition::location &loc,
const otf2::event::enter &event) {
63 auto start =
event.timestamp() - this->program_start_;
65 Slot::Builder builder{};
66 auto region =
new otf2::definition::region(event.region());
67 auto location =
new otf2::definition::location(loc);
68 builder.start(start)->location(location)->region(region);
70 std::vector<Slot::Builder> *builders;
71 auto buildersIt = this->slotsBuilding.find(location->ref().get());
72 if (buildersIt == this->slotsBuilding.end()) {
73 builders =
new std::vector<Slot::Builder>();
74 this->slotsBuilding.insert({location->ref().get(), builders});
76 builders = buildersIt->second;
79 builders->push_back(builder);
82void ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::leave &event) {
83 auto builders = this->slotsBuilding.at(location.ref().get());
85 Slot::Builder &builder = builders->back();
87 auto end =
event.timestamp() - this->program_start_;
90 this->slots_.push_back(
new Slot(builder.build()));
97void ReaderCallbacks::communicationEvent(T* self, uint32_t matching,
98 std::map<uint32_t, std::vector<CommunicationEvent*> *> &selfPending,
99 std::map<uint32_t, std::vector<CommunicationEvent*> *> &matchingPending
102 if (matchingPending.contains(matching)) {
103 auto& matchingEvents = matchingPending[matching];
104 auto matchingEvent = matchingEvents->back();
107 communications_.push_back(communication);
109 matchingEvents->pop_back();
110 if (matchingEvents->empty()) {
111 delete matchingEvents;
112 matchingPending.erase(matching);
115 std::vector<CommunicationEvent*> *matchingEvents;
116 auto id = self->getLocation()->ref().get();
117 if (selfPending.contains(
id)) {
118 matchingEvents = selfPending[id];
120 matchingEvents =
new std::vector<CommunicationEvent*>();
121 selfPending[id] = matchingEvents;
124 matchingEvents->push_back(self);
128void ReaderCallbacks::event(
const otf2::definition::location &loc,
const otf2::event::mpi_send &send) {
129 auto location =
new otf2::definition::location(loc);
130 auto comm =
new types::communicator(send.comm());
133 this->communicationEvent<BlockingSendEvent>(ev, send.receiver(), pendingSends, pendingReceives);
136void ReaderCallbacks::event(
const otf2::definition::location &loc,
const otf2::event::mpi_receive &receive) {
137 auto location =
new otf2::definition::location(loc);
138 auto comm =
new types::communicator(receive.comm());
141 this->communicationEvent(ev, receive.sender(), pendingReceives, pendingSends);
144void ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::mpi_isend_request &request) {
145 NonBlockingSendEvent::Builder builder;
146 auto comm =
new types::communicator (request.comm());
147 auto loc =
new otf2::definition::location(location);
148 auto start = relative(request.timestamp());
149 auto receiver = request.receiver();
150 builder.communicator(comm);
151 builder.location(loc);
152 builder.start(start);
153 builder.receiver(receiver);
155 this->uncompletedRequests.insert({request.request_id(), builder});
159ReaderCallbacks::event(
const otf2::definition::location &,
const otf2::event::mpi_isend_complete &complete) {
160 if (!uncompletedRequests.contains(complete.request_id())) {
161 throw std::logic_error(
"Found a mpi_isend_complete event with no matching mpi_isend_request event!");
164 auto builderVariant = uncompletedRequests[complete.request_id()];
165 if(!holds_alternative<NonBlockingSendEvent::Builder>(builderVariant)) {
166 throw std::logic_error(
"mpi_isend_complete event completes an mpi_ireceive event!");
169 auto builder = get<NonBlockingSendEvent::Builder>(builderVariant);
171 auto end = relative(complete.timestamp());
176 communicationEvent(ev, builder.receiver(), pendingSends, pendingReceives);
180ReaderCallbacks::event(
const otf2::definition::location &,
const otf2::event::mpi_ireceive_complete &complete) {
181 if (!uncompletedRequests.contains(complete.request_id())) {
182 throw std::logic_error(
"Found a mpi_ireceive_complete event with no matching mpi_ireceive_request event!");
185 auto builderVariant = uncompletedRequests[complete.request_id()];
186 if(!holds_alternative<NonBlockingReceiveEvent::Builder>(builderVariant)) {
187 throw std::logic_error(
"mpi_ireceive_complete event completes an mpi_isend event!");
190 auto builder = get<NonBlockingReceiveEvent::Builder>(builderVariant);
192 auto end = relative(complete.timestamp());
197 communicationEvent(ev, builder.sender(), pendingReceives, pendingSends);
201ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::mpi_ireceive_request &request) {
203 NonBlockingReceiveEvent::Builder builder;
204 auto comm =
new types::communicator (request.comm());
205 auto loc =
new otf2::definition::location(location);
206 auto start = relative(request.timestamp());
207 auto sender = request.sender();
208 builder.communicator(comm);
209 builder.location(loc);
210 builder.start(start);
211 builder.sender(sender);
213 this->uncompletedRequests.insert({request.request_id(), builder});
216void ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::mpi_request_test &test) {
217 callback::event(location, test);
220void ReaderCallbacks::event(
const otf2::definition::location &location,
221 const otf2::event::mpi_request_cancelled &cancelled) {
222 callback::event(location, cancelled);
226ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::mpi_collective_begin &begin) {
227 CollectiveCommunicationEvent::Member::Builder builder;
228 auto loc =
new otf2::definition::location(location);
229 auto start = relative(begin.timestamp());
231 builder.location(loc);
232 builder.start(start);
234 this->ongoingCollectiveCommunicationMembers.insert({loc->ref().get(), builder});
237void ReaderCallbacks::event(
const otf2::definition::location &location,
const otf2::event::mpi_collective_end &anEnd) {
238 if(ongoingCollectiveCommunication ==
nullptr) {
239 ongoingCollectiveCommunication =
new CollectiveCommunicationEvent::Builder();
240 std::vector<CollectiveCommunicationEvent::Member*> members;
241 auto loc =
new otf2::definition::location( location);
242 auto comm =
new types::communicator (anEnd.comm());
243 auto operation = anEnd.type();
244 auto root = anEnd.root();
245 ongoingCollectiveCommunication->members(members);
246 ongoingCollectiveCommunication->location(loc);
247 ongoingCollectiveCommunication->communicator(comm);
248 ongoingCollectiveCommunication->operation(operation);
249 ongoingCollectiveCommunication->root(root);
252 auto member = ongoingCollectiveCommunicationMembers[location.ref().get()];
253 auto end = relative(anEnd.timestamp());
257 ongoingCollectiveCommunicationMembers.erase(location.ref().get());
260 if(ongoingCollectiveCommunicationMembers.empty()){
262 collectiveCommunications_.push_back(event);
263 delete ongoingCollectiveCommunication;
264 ongoingCollectiveCommunication =
nullptr;
269void ReaderCallbacks::events_done(
const otf2::reader::reader &) {
270 std::sort(this->slots_.begin(), this->slots_.end(), [](
Slot *rhs,
Slot *lhs) {
271 return rhs->startTime < lhs->startTime;
273 std::sort(this->communications_.begin(), this->communications_.end(),
275 return rhs->getStartEvent()->getStartTime() < lhs->getStartEvent()->getStartTime();
278 for (
const auto &item: this->slotsBuilding) {
282 for (
const auto &item: this->pendingSends) {
286 for (
const auto &item: this->pendingReceives) {
294 std::destroy(this->slotsBuilding.begin(), this->slotsBuilding.end());
295 std::destroy(this->pendingSends.begin(), this->pendingSends.end());
296 std::destroy(this->pendingReceives.begin(), this->pendingReceives.end());
297 std::destroy(this->uncompletedRequests.begin(), this->uncompletedRequests.end());
300otf2::chrono::duration ReaderCallbacks::relative(otf2::chrono::time_point timepoint)
const {
301 return timepoint - program_start_;
305 return communications_;
309 return collectiveCommunications_;
Class representing the blocking send event.
Class representing the blocking send event.
A class representing a member of a collective operation.
A class representing an MPI collective operation.
Class representing any (successful or unsuccessful) communication.
Class representing the non blocking receive event.
Class representing the non blocking send event.
std::vector< Slot * > getSlots()
Returns all read slots.
std::vector< Communication * > getCommunications()
Returns all read point to point communications.
ReaderCallbacks(otf2::reader::reader &rdr)
Creates a new instance of the ReaderCallbacks class.
otf2::chrono::duration duration() const
std::vector< CollectiveCommunicationEvent * > getCollectiveCommunications()
Returns all read collective communications.
A Slot represents a visual slot to be rendered in the UI. It contains the information of a location.