Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Revert partially constructed segments on-error in segment init function #361

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ef06972
Don't skip this test as its perfectly valid and passes, the reference…
dagardner-nv Aug 2, 2023
79d144b
Test for issue #360
dagardner-nv Aug 2, 2023
52968bf
Remove unneeded copy/paste code
dagardner-nv Aug 2, 2023
1178c02
Revert added objects and ingress/egress ports on error in segment ini…
dagardner-nv Aug 2, 2023
ac2853c
Better test name
dagardner-nv Aug 2, 2023
6a26e4f
wip
dagardner-nv Aug 2, 2023
cccef95
Only run iwyu on compilation units actually changed in PR
dagardner-nv Aug 2, 2023
6914da7
Checks require clang
dagardner-nv Aug 2, 2023
dbeeedc
Revert "Checks require clang"
dagardner-nv Aug 2, 2023
faf233c
test [no ci]
dagardner-nv Aug 2, 2023
9fa907c
Don't fetch base branch if it's already set
dagardner-nv Aug 2, 2023
81d094f
Add comment explaining version
dagardner-nv Aug 2, 2023
9d55259
Avoid 15.0.7
dagardner-nv Aug 2, 2023
1eb89f7
revert clang version changes [no ci]
dagardner-nv Aug 3, 2023
c0f8a35
Adopt updated versions of boost and libhwlock this <hand wavy> choose…
dagardner-nv Aug 3, 2023
46ac7b5
Apply fix from mdemoret-nv:mdd_force-stubgen-dev
dagardner-nv Aug 3, 2023
a6244c9
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Aug 18, 2023
f285c41
Merge branch 'branch-23.11' into david-inconsistent-pipe [no ci]
dagardner-nv Sep 25, 2023
f6503db
Merge branch 'branch-23.11' into david-inconsistent-pipe
dagardner-nv Sep 25, 2023
0f1bf1c
Merge branch 'david-inconsistent-pipe' of github.com:dagardner-nv/MRC…
dagardner-nv Sep 25, 2023
1cd28f6
IWYU changes
dagardner-nv Sep 25, 2023
743d4d3
Merge branch 'branch-23.11' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Sep 25, 2023
dfa066e
Python repro for issue #360
dagardner-nv Sep 26, 2023
9e9a5f7
Place executor.join under exception handler [no ci]
dagardner-nv Sep 26, 2023
38c0e15
Fix repro test, works if segment2 raises, but fails os segment1 raise…
dagardner-nv Sep 26, 2023
22220a2
Add second test, with exception being thrown in both the first and se…
dagardner-nv Sep 26, 2023
06d28a6
WIP: A bunch of bebug logging that needs to be removed, add a destroy…
dagardner-nv Sep 27, 2023
1d12c2f
WIP: Reset m_owned_edge from release_edge_connection
dagardner-nv Sep 27, 2023
22205c0
WIP: more debug logging than you can shake a stick at [no ci]
dagardner-nv Sep 27, 2023
142e9b8
WIP: tests passing [no ci]
dagardner-nv Sep 28, 2023
a0f65fd
Release edge created in constructor [no ci]
dagardner-nv Sep 28, 2023
2289f42
Remove debug logging
dagardner-nv Sep 28, 2023
5d0d6a9
WIP: DO NOT MERGE
dagardner-nv Sep 28, 2023
7d69c5d
Enable logging in test [no ci]
dagardner-nv Sep 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/conda/environments/clang_env.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the License);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Expand Down
15 changes: 15 additions & 0 deletions cpp/mrc/include/mrc/edge/edge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ class EdgeBase
m_disconnectors.clear();
};

void log_edges()
{
LOG(INFO) << "EdgeBase: " << this << " has " << m_linked_edges.size() << " linked edges";
for (auto& linked_edge : m_linked_edges)
{
LOG(INFO) << " Linked edge: " << linked_edge.get();
}
}

bool is_connected() const
{
return m_is_connected;
Expand Down Expand Up @@ -193,6 +202,12 @@ class Edge : public virtual EdgeBase
friend class EdgeHolder;
template <typename, typename>
friend class MultiEdgeHolder;

void log_edges()
{
LOG(INFO) << "Edge: " << this;
EdgeBase::log_edges();
}
};

class EdgeTypeInfo
Expand Down
6 changes: 6 additions & 0 deletions cpp/mrc/include/mrc/edge/edge_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ struct EdgeBuilder final
template <typename SourceT, typename SinkT = SourceT, bool AllowNarrowingV = true>
static void make_edge_writable(IWritableAcceptor<SourceT>& source, IWritableProvider<SinkT>& sink)
{
LOG(INFO) << "\t\tcreating writable edge";
constexpr bool IsConvertable = std::is_convertible_v<SourceT, SinkT>;
constexpr bool LessBits = sizeof(SourceT) > sizeof(SinkT); // Sink requires more bits than source.
constexpr bool FloatToInt = std::is_floating_point_v<SourceT> && std::is_integral_v<SinkT>; // float -> int
Expand Down Expand Up @@ -126,6 +127,7 @@ struct EdgeBuilder final
LOG(FATAL) << "No dynamic lookup available for statically typed objects";
}

LOG(INFO) << "\t\twritable edge = " << edge.get() << "\t handle = " << edge->get_handle().get();
source.set_writable_edge_handle(edge);
}

Expand Down Expand Up @@ -173,6 +175,7 @@ struct EdgeBuilder final
LOG(FATAL) << "No dynamic lookup available for statically typed objects";
}

LOG(INFO) << "\t\treadable edge = " << edge.get() << "\t handle = " << edge->get_handle().get();
sink.set_readable_edge_handle(edge);
}

Expand Down Expand Up @@ -227,6 +230,8 @@ struct EdgeBuilder final
auto edge_handle = edge_holder.get_connected_edge();
edge_holder.release_edge_connection();

LOG(INFO) << "Splicing edge " << edge_handle.get() << " into " << splice_writable_acceptor;

make_edge_writable(*writable_acceptor, *splice_writable_provider);
make_edge_writable(*splice_writable_acceptor, sink);
}
Expand Down Expand Up @@ -463,6 +468,7 @@ void make_edge(SourceT& source, SinkT& sink)
using source_full_t = SourceT;
using sink_full_t = SinkT;

LOG(INFO) << "\tedge_builder::make_edge";
if constexpr (is_base_of_template<edge::IWritableAcceptor, source_full_t>::value &&
is_base_of_template<edge::IWritableProvider, sink_full_t>::value)
{
Expand Down
21 changes: 19 additions & 2 deletions cpp/mrc/include/mrc/edge/edge_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,27 @@ template <typename T>
class EdgeHolder
{
public:
EdgeHolder() = default;
EdgeHolder()
{
LOG(INFO) << "EdgeHolder() = " << this;
};

virtual ~EdgeHolder()
{
auto p = m_connected_edge.get();
// Drop any edge connections before this object goes out of scope. This should execute any disconnectors
m_connected_edge.reset();

if (this->check_active_connection(false))
{
LOG(FATAL) << "A node was destructed which still had dependent connections. Nodes must be kept alive while "
LOG(INFO) << "edge = " << p << " edge_holder=" << this;
if (p)
{
p->log_edges();
}
LOG(FATAL) << this
<< " A node was destructed which still had dependent connections. Nodes must be kept alive "
"while "
"dependent connections are still active";
}
}
Expand All @@ -69,6 +81,7 @@ class EdgeHolder
// Alive connection exists when the lock is true, lifetime is false or a connction object has been set
if (m_owned_edge.lock() && !m_owned_edge_lifetime)
{
LOG(INFO) << "check_active_connection = " << this << " m_owned_edge.lock() && !m_owned_edge_lifetime";
// Then someone is using this edge already, cant be changed
if (do_throw)
{
Expand All @@ -80,6 +93,8 @@ class EdgeHolder
// Check for set connections. Must be connected to throw error
if (m_connected_edge && m_connected_edge->is_connected())
{
LOG(INFO) << "check_active_connection = " << this
<< " m_connected_edge && m_connected_edge->is_connected()";
// Then someone is using this edge already, cant be changed
if (do_throw)
{
Expand Down Expand Up @@ -152,8 +167,10 @@ class EdgeHolder

void release_edge_connection()
{
LOG(INFO) << "Releasing edge connection for edge_holder=" << this;
m_owned_edge_lifetime.reset();
m_connected_edge.reset();
m_owned_edge.reset();
}

const std::shared_ptr<Edge<T>>& get_connected_edge() const
Expand Down
14 changes: 14 additions & 0 deletions cpp/mrc/include/mrc/manifold/composite_manifold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ class CompositeManifold : public Manifold
mrc::make_edge(*m_ingress, *m_egress);
}

~CompositeManifold() override
{
LOG(INFO) << "CompositeManifold::~CompositeManifold(): " << info();
shutdown();
};

void shutdown() final
{
LOG(INFO) << "CompositeManifold::shutdown(): " << info();
m_ingress->shutdown();
m_egress->shutdown();
LOG(INFO) << "CompositeManifold::shutdown(): done";
}

protected:
IngressT& ingress()
{
Expand Down
7 changes: 7 additions & 0 deletions cpp/mrc/include/mrc/manifold/egress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct EgressDelegate
{
virtual ~EgressDelegate() = default;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
virtual void shutdown(){};
};

template <typename T>
Expand All @@ -55,6 +56,12 @@ class TypedEgress : public EgressDelegate
template <typename T>
class RoundRobinEgress : public node::Router<SegmentAddress, T>, public TypedEgress<T>
{
public:
void shutdown() final
{
node::Router<SegmentAddress, T>::release_edge_connections();
}

protected:
SegmentAddress determine_key_for_value(const T& t) override
{
Expand Down
7 changes: 7 additions & 0 deletions cpp/mrc/include/mrc/manifold/ingress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct IngressDelegate
{
virtual ~IngressDelegate() = default;
virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void shutdown(){};
};

template <typename T>
Expand All @@ -51,6 +52,12 @@ class TypedIngress : public IngressDelegate
template <typename T>
class MuxedIngress : public node::Muxer<T>, public TypedIngress<T>
{
public:
void shutdown() final
{
node::SourceProperties<T>::release_edge_connection();
}

protected:
void do_add_input(const SegmentAddress& address, edge::IWritableAcceptor<T>* source) final
{
Expand Down
6 changes: 4 additions & 2 deletions cpp/mrc/include/mrc/manifold/interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ struct Interface
virtual ~Interface() = default;

virtual const PortName& port_name() const = 0;
virtual const std::string& info() const = 0;

virtual void start() = 0;
virtual void join() = 0;
virtual void start() = 0;
virtual void join() = 0;
virtual void shutdown() = 0;

virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/include/mrc/manifold/manifold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class Manifold : public Interface
~Manifold() override;

const PortName& port_name() const final;
const std::string& info() const final;
void shutdown() override;

protected:
runnable::IRunnableResources& resources();

const std::string& info() const;

private:
void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) final;
void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) final;
Expand Down
7 changes: 6 additions & 1 deletion cpp/mrc/include/mrc/node/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class Queue : public WritableProvider<T>, public ReadableProvider<T>
{
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}
~Queue() override = default;

~Queue() override
{
SinkProperties<T>::release_edge_connection();
SourceProperties<T>::release_edge_connection();
};

void set_channel(std::unique_ptr<mrc::channel::Channel<T>> channel)
{
Expand Down
18 changes: 17 additions & 1 deletion cpp/mrc/include/mrc/node/rx_sink_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ class RxSinkBase : public WritableProvider<T>, public ReadableAcceptor<T>, publi
public:
void sink_add_watcher(std::shared_ptr<WatcherInterface> watcher);
void sink_remove_watcher(std::shared_ptr<WatcherInterface> watcher);
void release_edge_connection();

protected:
RxSinkBase();
~RxSinkBase() override = default;
~RxSinkBase() override;

const rxcpp::observable<T>& observable() const;

Expand All @@ -70,6 +71,21 @@ RxSinkBase<T>::RxSinkBase() :
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}

template <typename T>
RxSinkBase<T>::~RxSinkBase()
{
LOG(INFO) << "~RxSinkBase()";
this->release_edge_connection();
LOG(INFO) << "~RxSinkBase() - released";
}

template <typename T>
void RxSinkBase<T>::release_edge_connection()
{
LOG(ERROR) << "RxSinkBase releasing edge connection";
SinkChannelOwner<T>::release_edge_connection();
}

template <typename T>
const rxcpp::observable<T>& RxSinkBase<T>::observable() const
{
Expand Down
18 changes: 17 additions & 1 deletion cpp/mrc/include/mrc/node/rx_source_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ class RxSourceBase : public ReadableProvider<T>,
public:
void source_add_watcher(std::shared_ptr<WatcherInterface> watcher);
void source_remove_watcher(std::shared_ptr<WatcherInterface> watcher);
void release_edge_connection();

protected:
RxSourceBase();
~RxSourceBase() override = default;
~RxSourceBase() override;

const rxcpp::observer<T>& observer() const;

Expand Down Expand Up @@ -84,6 +85,21 @@ RxSourceBase<T>::RxSourceBase() :
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}

template <typename T>
RxSourceBase<T>::~RxSourceBase()
{
LOG(ERROR) << "~RxSourceBase";
this->release_edge_connection();
LOG(ERROR) << "~RxSourceBase - done";
}

template <typename T>
void RxSourceBase<T>::release_edge_connection()
{
LOG(ERROR) << "RxSourceBase releasing edge connection";
SourceChannelOwner<T>::release_edge_connection();
}

template <typename T>
const rxcpp::observer<T>& RxSourceBase<T>::observer() const
{
Expand Down
20 changes: 14 additions & 6 deletions cpp/mrc/include/mrc/segment/builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ namespace {
namespace hana = boost::hana;

template <typename T>
auto has_source_add_watcher = hana::is_valid(
[](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).source_add_watcher(
std::declval<std::shared_ptr<mrc::WatcherInterface>>())) {});
auto has_source_add_watcher =
hana::is_valid([](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).source_add_watcher(
std::declval<std::shared_ptr<mrc::WatcherInterface>>())) {});

template <typename T>
auto has_sink_add_watcher = hana::is_valid(
[](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).sink_add_watcher(
std::declval<std::shared_ptr<mrc::WatcherInterface>>())) {});
auto has_sink_add_watcher =
hana::is_valid([](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).sink_add_watcher(
std::declval<std::shared_ptr<mrc::WatcherInterface>>())) {});

template <typename T>
void add_stats_watcher_if_rx_source(T& thing, std::string name)
Expand Down Expand Up @@ -464,6 +464,8 @@ void IBuilder::make_edge(SourceObjectT source, SinkObjectT sink)
auto& source_object = to_object_properties(source);
auto& sink_object = to_object_properties(sink);

LOG(INFO) << "Creating edge from " << source_object.name() << " to " << sink_object.name();

// If we can determine the type from the actual object, use that, then fall back to hints or defaults.
using deduced_source_type_t = first_non_void_type_t<source_sp_type_t, // Deduced type (if possible)
SourceNodeTypeT, // Explicit type hint
Expand All @@ -479,13 +481,18 @@ void IBuilder::make_edge(SourceObjectT source, SinkObjectT sink)

if (source_object.is_writable_acceptor() && sink_object.is_writable_provider())
{
LOG(INFO) << "\tCreating edge from WritableAcceptor to WritableProvider for " << source_object.name() << " to "
<< sink_object.name();
mrc::make_edge(source_object.template writable_acceptor_typed<deduced_source_type_t>(),
sink_object.template writable_provider_typed<deduced_sink_type_t>());
LOG(INFO) << "\tCreating edge from WritableAcceptor to WritableProvider for " << source_object.name() << " to "
<< sink_object.name() << " - done";
return;
}

if (source_object.is_readable_provider() && sink_object.is_readable_acceptor())
{
LOG(INFO) << "\tCreating edge from ReadableProvider to ReadableAcceptor";
mrc::make_edge(source_object.template readable_provider_typed<deduced_source_type_t>(),
sink_object.template readable_acceptor_typed<deduced_sink_type_t>());
return;
Expand Down Expand Up @@ -609,6 +616,7 @@ void IBuilder::add_throughput_counter(std::shared_ptr<Object<ObjectT>> segment_o
CHECK(segment_object->is_source());
using source_type_t = typename ObjectT::source_type_t;
auto counter = this->make_throughput_counter(runnable->name());

runnable->object().add_epilogue_tap([counter](const source_type_t& data) {
counter(1);
});
Expand Down
5 changes: 5 additions & 0 deletions cpp/mrc/include/mrc/segment/component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class Component final : public Object<ResourceT>
Component(std::unique_ptr<ResourceT> resource) : m_resource(std::move(resource)) {}
~Component() final = default;

void destroy() final
{
m_resource.reset();
}

private:
ResourceT* get_object() const final
{
Expand Down
Loading