Skip to content

Commit

Permalink
Merge pull request #278 from JeffersonLab/nbrei_timeslices
Browse files Browse the repository at this point in the history
Add support for timeslices
  • Loading branch information
nathanwbrei authored Mar 16, 2024
2 parents e488b9d + 6787236 commit 523c0a5
Show file tree
Hide file tree
Showing 47 changed files with 2,338 additions and 104 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ if (${USE_XERCES})
endif()

if (${USE_ASAN})
add_compile_options(-fsanitize=address -g -O2)
add_compile_options(-fsanitize=address)
add_link_options(-fsanitize=address)
endif()

if (${USE_TSAN})
add_compile_options(-fsanitize=thread -g -O2)
add_compile_options(-fsanitize=thread)
add_link_options(-fsanitize=thread)
endif()

Expand Down
3 changes: 2 additions & 1 deletion src/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ add_subdirectory(BlockExample)
add_subdirectory(SubeventExample)
add_subdirectory(SubeventCUDAExample)
add_subdirectory(UnitTestingExample)
add_subdirectory(PodioExample)
add_subdirectory(PodioExample)
add_subdirectory(TimesliceExample)
2 changes: 0 additions & 2 deletions src/examples/StreamingExample/ZmqMain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ void InitPlugin(JApplication *app) {
app->Add(new AHitAnomalyDetector(app, 5000));
app->Add(new JFactoryGeneratorT<AHitParser>());

// So we don't have to put this on the cmd line every time
app->SetParameterValue("jana:legacy_mode", 0);
app->SetParameterValue("jana:extended_report", 0);

new std::thread(dummy_publisher_loop);
Expand Down
4 changes: 2 additions & 2 deletions src/examples/SubeventExample/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set (SubeventExample_SOURCES
add_executable(SubeventExample ${SubeventExample_SOURCES})

target_link_libraries(SubeventExample jana2)
set_target_properties(SubeventExample PROPERTIES PREFIX "" OUTPUT_NAME "SubeventExample" SUFFIX ".so")
install(TARGETS SubeventExample DESTINATION plugins)
set_target_properties(SubeventExample PROPERTIES PREFIX "" OUTPUT_NAME "SubeventExample")
install(TARGETS SubeventExample DESTINATION programs)


16 changes: 16 additions & 0 deletions src/examples/TimesliceExample/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@


if (USE_PODIO)
set (TimesliceExample_SOURCES
TimesliceExample.cc
)

add_library(TimesliceExample SHARED ${TimesliceExample_SOURCES})
target_link_libraries(TimesliceExample jana2 podio::podio PodioExampleDatamodel PodioExampleDatamodelDict podio::podioRootIO)
set_target_properties(TimesliceExample PROPERTIES PREFIX "" SUFFIX ".so" OUTPUT_NAME "TimesliceExample")
install(TARGETS TimesliceExample DESTINATION programs)

else()
message(STATUS "Skipping examples/TimesliceExample because USE_PODIO=Off")
endif()

18 changes: 18 additions & 0 deletions src/examples/TimesliceExample/MyDataModel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once

#include <JANA/JObject.h>

struct MyHit : public JObject {
int hit_id;
int energy, x, y;
};

struct MyCluster : public JObject {
int cluster_id;
int energy, x, y;
std::vector<int> hits;
};

39 changes: 39 additions & 0 deletions src/examples/TimesliceExample/MyEventFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@



// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JFactoryT.h>
#include <JANA/Omni/JOmniFactory.h>


struct MyClusterFactory : public JFactoryT<MyCluster> {

int init_call_count = 0;
int change_run_call_count = 0;
int process_call_count = 0;

MyClusterFactory() {
SetLevel(JEventLevel::Event);
}

void Init() override {
++init_call_count;
}

void ChangeRun(const std::shared_ptr<const JEvent>&) override {
++change_run_call_count;
}

void Process(const std::shared_ptr<const JEvent>& event) override {
++process_call_count;

auto protos = event->Get<MyCluster>("protos");
// TODO: Output something sensible
}
};


35 changes: 35 additions & 0 deletions src/examples/TimesliceExample/MyEventProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventProcessor.h>

struct ExampleEventProcessor : public JEventProcessor {

std::mutex m_mutex;

ExampleTimesliceProcessor() {
SetEventLevel(JEvent::Level::Event);
}

void Process(const std::shared_ptr<const JEvent>& event) {

std::lock_guard<std::mutex> guard(m_mutex);

auto outputs = event->Get<MyOutput>();
// assert(outputs.size() == 4);
// assert(outputs[0]->z == 25.6f);
// assert(outputs[1]->z == 26.5f);
// assert(outputs[2]->z == 27.4f);
// assert(outputs[3]->z == 28.3f);
LOG << " Contents of event " << event->GetEventNumber() << LOG_END;
for (auto output : outputs) {
LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
}
LOG << " DONE with contents of event " << event->GetEventNumber() << LOG_END;
}
};


37 changes: 37 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"

#include <JANA/Omni/JOmniFactory.h>
#include <JANA/JFactoryT.h>


struct MyProtoClusterFactory : public JFactoryT<MyCluster> {

int init_call_count = 0;
int change_run_call_count = 0;
int process_call_count = 0;

MyProtoClusterFactory() {
SetLevel(JEventLevel::Timeslice);
}

void Init() override {
++init_call_count;
}

void ChangeRun(const std::shared_ptr<const JEvent>&) override {
++change_run_call_count;
}

void Process(const std::shared_ptr<const JEvent>& event) override {
++process_call_count;

auto protos = event->Get<MyCluster>("protos");
// TODO: Output something sensible
}
};


40 changes: 40 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceSource.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventSource.h>


struct MyTimesliceSource : public JEventSource {

MyTimesliceSource(std::string source_name, JApplication *app) : JEventSource(source_name, app) {
SetLevel(JEventLevel::Timeslice);
}

static std::string GetDescription() { return "MyTimesliceSource"; }

std::string GetType(void) const override { return JTypeInfo::demangle<decltype(*this)>(); }

void Open() override { }

void GetEvent(std::shared_ptr<JEvent> event) override {

auto evt = event->GetEventNumber();
std::vector<MyInput*> inputs;
inputs.push_back(new MyInput(22,3.6,evt,0));
inputs.push_back(new MyInput(23,3.5,evt,1));
inputs.push_back(new MyInput(24,3.4,evt,2));
inputs.push_back(new MyInput(25,3.3,evt,3));
inputs.push_back(new MyInput(26,3.2,evt,4));
event->Insert(inputs);

auto hits = std::make_unique<ExampleHitCollection>();
hits.push_back(ExampleHit(22));
hits.push_back(ExampleHit(23));
hits.push_back(ExampleHit(24));
event->InsertCollection(hits);

jout << "MyTimesliceSource: Emitting " << event->GetEventNumber() << jendl;
}
};
43 changes: 43 additions & 0 deletions src/examples/TimesliceExample/MyTimesliceUnfolder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include "MyDataModel.h"
#include <JANA/JEventUnfolder.h>

struct ExampleTimesliceUnfolder : public JEventUnfolder {

MyTimesliceUnfolder() {
SetParentLevel(JEventLevel::Timeslice);
SetChildLevel(JEventLevel::Event);
}

void Preprocess(const JEvent& parent) const override {
parent->Get<MyCluster>("protos");
}

Result Unfold(const JEvent& parent, JEvent& child, int item) override {
auto protos = parent->Get<MyCluster>("protos");

child.SetEventNumber(parent.GetEventNumber()*10 + item);
LOG << "Unfolding parent=" << parent.GetEventNumber() << ", child=" << child.GetEventNumber() << ", item=" << item << LOG_END;

std::vector<MyCluster*> child_protos;
for (auto proto: protos) {
if (true) {
// TODO: condition
child_protos.push_back(proto);
}
}
child->Insert(child_protos, "event_protos")->SetFactoryFlag(JFactoryFlag::NOT_OBJECT_OWNER);

if (item == 3) {
jout << "Unfold found item 3, finishing join" << jendl;
return Result::Finished;
}
return Result::KeepGoing;
}
}



30 changes: 30 additions & 0 deletions src/examples/TimesliceExample/TimesliceExample.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.


#include "MyTimesliceSource.h"
#include "MyTimesliceUnfolder.h"
#include "MyEventProcessor.h"
#include "MyTimesliceFactory.h"
#include "MyEventFactory.h"

#include <JANA/JApplication.h>


extern "C"{
void InitPlugin(JApplication *app) {

InitJANAPlugin(app);

app->Add(new MyTimesliceSource("Dummy"));
app->Add(new MyTimesliceUnfolder);
app->Add(new MyEventProcessor);

app->Add(new JFactoryGeneratorT<MyTimesliceFactory>());
app->Add(new JFactoryGeneratorT<MyEventFactory>());

app->SetParameterValue("jana:extended_report", 0);
}
} // "C"


4 changes: 4 additions & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ set(JANA2_SOURCES
Engine/JEventSourceArrow.h
Engine/JBlockSourceArrow.h
Engine/JBlockDisentanglerArrow.h
Engine/JEventMapArrow.h
Engine/JEventMapArrow.cc
Engine/JPool.h

Engine/JMailbox.h
Expand Down Expand Up @@ -244,6 +246,7 @@ file(GLOB jana_calibs_headers "Calibrations/*.h*")
file(GLOB jana_cli_headers "CLI/*.h*")
file(GLOB jana_compat_headers "Compatibility/*.h*")
file(GLOB jana_podio_headers "Podio/*.h*")
file(GLOB jana_omni_headers "Omni/*.h*")

install(FILES ${jana_headers} DESTINATION include/JANA)
install(FILES ${jana_engine_headers} DESTINATION include/JANA/Engine)
Expand All @@ -254,6 +257,7 @@ install(FILES ${jana_utils_headers} DESTINATION include/JANA/Utils)
install(FILES ${jana_calibs_headers} DESTINATION include/JANA/Calibrations)
install(FILES ${jana_cli_headers} DESTINATION include/JANA/CLI)
install(FILES ${jana_compat_headers} DESTINATION include/JANA/Compatibility)
install(FILES ${jana_omni_headers} DESTINATION include/JANA/Omni)

if (${USE_PODIO})
install(FILES ${jana_podio_headers} DESTINATION include/JANA/Podio)
Expand Down
7 changes: 6 additions & 1 deletion src/libraries/JANA/Engine/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class JArrow {
const std::string m_name; // Used for human understanding
const bool m_is_parallel; // Whether or not it is safe to parallelize
const bool m_is_source; // Whether or not this arrow should activate/drain the topology
const bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties
Expand All @@ -39,6 +39,7 @@ class JArrow {

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows
friend class JTopologyBuilder;
std::vector<PlaceRefBase*> m_places; // Will eventually supplant m_listeners, m_chunksize

protected:
Expand All @@ -57,6 +58,10 @@ class JArrow {
m_logger = logger;
}

void set_is_sink(bool is_sink) {
m_is_sink = is_sink;
}

// TODO: Get rid of me
void set_chunksize(size_t chunksize) {
std::lock_guard<std::mutex> lock(m_arrow_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Engine/JArrowTopology.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct JArrowTopology {
// Ensure that ComponentManager stays alive at least as long as JArrowTopology does
// Otherwise there is a potential use-after-free when JArrowTopology or JArrowProcessingController access components

std::shared_ptr<JEventPool> event_pool; // TODO: Move into pools eventually
JEventPool* event_pool; // TODO: Move into pools eventually
JPerfMetrics metrics;

std::vector<JArrow*> arrows;
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Engine/JBlockDisentanglerArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class JBlockDisentanglerArrow : public JArrow {
JBlockedEventSource<T>* m_source; // non-owning
JMailbox<T*>* m_block_queue; // non-owning
JMailbox<std::shared_ptr<JEvent>*>* m_event_queue; // non-owning
std::shared_ptr<JEventPool> m_pool;
JEventPool* m_pool;

size_t m_max_events_per_block = 40;

Expand All @@ -23,13 +23,13 @@ class JBlockDisentanglerArrow : public JArrow {
JBlockedEventSource<T>* source,
JMailbox<T*>* block_queue,
JMailbox<std::shared_ptr<JEvent>*>* event_queue,
std::shared_ptr<JEventPool> pool
JEventPool* pool
)
: JArrow(std::move(name), true, false, false, 1)
, m_source(source)
, m_block_queue(block_queue)
, m_event_queue(event_queue)
, m_pool(std::move(pool))
, m_pool(pool)
{}

~JBlockDisentanglerArrow() {
Expand Down
Loading

0 comments on commit 523c0a5

Please sign in to comment.