diff --git a/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptor.java b/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptor.java index 89bac39137a4..4cdb1663add0 100644 --- a/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptor.java +++ b/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptor.java @@ -22,7 +22,7 @@ import com.navercorp.pinpoint.bootstrap.logging.PLogger; import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory; -public abstract class FluxAndMonoConstructorInterceptor implements AroundInterceptor { +public class FluxAndMonoConstructorInterceptor implements AroundInterceptor { private final PLogger logger = PLoggerFactory.getLogger(getClass()); private final boolean isDebug = logger.isDebugEnabled(); diff --git a/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptor.java b/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptor.java new file mode 100644 index 000000000000..05c3c5369034 --- /dev/null +++ b/bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptor.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils; +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; +import com.navercorp.pinpoint.bootstrap.context.TraceContext; +import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor; +import com.navercorp.pinpoint.bootstrap.logging.PLogger; +import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory; + +public class FluxAndMonoSubscribeOrReturnInterceptor implements AroundInterceptor { + private final PLogger logger = PLoggerFactory.getLogger(getClass()); + private final boolean isDebug = logger.isDebugEnabled(); + + public FluxAndMonoSubscribeOrReturnInterceptor() { + } + + @Override + public void before(Object target, Object[] args) { + } + + @Override + public void after(Object target, Object[] args, Object result, Throwable throwable) { + if (isDebug) { + logger.afterInterceptor(target, args, result, throwable); + } + + if (throwable != null) { + // Ignore if an error occurs. + return; + } + if (result == null) { + return; + } + + if (checkTargetReactorContextAccessor(target, args, result)) { + return; + } + if (checkTargetAsyncContextAccessor(target, args, result)) { + return; + } + if (checkSubscriberReactorContextAccessor(target, args, result)) { + return; + } + } + + boolean checkTargetReactorContextAccessor(final Object target, final Object[] args, final Object result) { + final AsyncContext asyncContext = ReactorContextAccessorUtils.getAsyncContext(target); + if (asyncContext != null) { + setReactorContextToResult(asyncContext, result); + return true; + } + return false; + } + + boolean checkTargetAsyncContextAccessor(final Object target, final Object[] args, final Object result) { + final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target); + if (asyncContext != null) { + setReactorContextToResult(asyncContext, result); + return true; + } + return false; + } + + boolean checkSubscriberReactorContextAccessor(final Object target, final Object[] args, final Object result) { + final AsyncContext asyncContext = ReactorContextAccessorUtils.getAsyncContext(args, 0); + if (asyncContext != null) { + setReactorContextToResult(asyncContext, result); + return true; + } + return false; + } + + protected void setReactorContextToResult(AsyncContext asyncContext, Object result) { + ReactorContextAccessorUtils.setAsyncContext(asyncContext, result); + } +} diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/CoreSubscriberConstructorInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/CoreSubscriberConstructorInterceptorTest.java new file mode 100644 index 000000000000..6d79f2c99989 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/CoreSubscriberConstructorInterceptorTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class CoreSubscriberConstructorInterceptorTest { + + @Test + public void arg0ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + CoreSubscriberConstructorInterceptor interceptor = new CoreSubscriberConstructorInterceptor(); + + // Set asyncContext to target + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), arg0._$PINPOINT$_getReactorContext()); + } + + @Test + public void argNotContainReactorContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + CoreSubscriberConstructorInterceptor interceptor = new CoreSubscriberConstructorInterceptor(); + + // Not set asyncContext to target + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNull(target._$PINPOINT$_getReactorContext()); + } + + @Test + public void arg1ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg1 = new MockAsyncContextAndReactorContextImpl(); + CoreSubscriberConstructorInterceptor interceptor = new CoreSubscriberConstructorInterceptor(); + + // Set asyncContext to target + arg1._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0, arg1}, new Object(), null); + + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), arg1._$PINPOINT$_getReactorContext()); + } + + @Test + public void throwableIsNotNull() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + Throwable throwable = new Throwable("ERROR"); + CoreSubscriberConstructorInterceptor interceptor = new CoreSubscriberConstructorInterceptor(); + + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, new Object(), throwable); + + assertNull(target._$PINPOINT$_getReactorContext()); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptorTest.java new file mode 100644 index 000000000000..232061c0b0a1 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoConstructorInterceptorTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class FluxAndMonoConstructorInterceptorTest { + + @Test + public void arg0ContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoConstructorInterceptor interceptor = new FluxAndMonoConstructorInterceptor(); + + // Set asyncContext to target + arg0._$PINPOINT$_setAsyncContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNotNull(target._$PINPOINT$_getAsyncContext()); + assertEquals(target._$PINPOINT$_getAsyncContext(), arg0._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), arg0._$PINPOINT$_getAsyncContext()); + } + + @Test + public void arg0NotContainAsyncContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoConstructorInterceptor interceptor = new FluxAndMonoConstructorInterceptor(); + + // Not set asyncContext to target + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + } + + @Test + public void arg1ContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg1 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoConstructorInterceptor interceptor = new FluxAndMonoConstructorInterceptor(); + + // Set asyncContext to target + arg1._$PINPOINT$_setAsyncContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0, arg1}, new Object(), null); + + assertNotNull(target._$PINPOINT$_getAsyncContext()); + assertEquals(target._$PINPOINT$_getAsyncContext(), arg1._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), arg1._$PINPOINT$_getAsyncContext()); + } + + @Test + public void throwableIsNotNull() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + Throwable throwable = new Throwable("ERROR"); + FluxAndMonoConstructorInterceptor interceptor = new FluxAndMonoConstructorInterceptor(); + + interceptor.after(target, new Object[]{arg0}, new Object(), throwable); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorConstructorInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorConstructorInterceptorTest.java new file mode 100644 index 000000000000..b4b5ab20951b --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorConstructorInterceptorTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class FluxAndMonoOperatorConstructorInterceptorTest { + + @Test + public void arg0ContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorConstructorInterceptor interceptor = new FluxAndMonoOperatorConstructorInterceptor(); + + // Set asyncContext to target + arg0._$PINPOINT$_setAsyncContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void arg0ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorConstructorInterceptor interceptor = new FluxAndMonoOperatorConstructorInterceptor(); + + // Set asyncContext to target + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void arg0NotContainAsyncContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorConstructorInterceptor interceptor = new FluxAndMonoOperatorConstructorInterceptor(); + + // Not set asyncContext to target + interceptor.after(target, new Object[]{arg0}, new Object(), null); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + } + + @Test + public void arg1ContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg1 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorConstructorInterceptor interceptor = new FluxAndMonoOperatorConstructorInterceptor(); + + // Set asyncContext to target + arg1._$PINPOINT$_setAsyncContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0, arg1}, new Object(), null); + + assertNotNull(target._$PINPOINT$_getAsyncContext()); + assertEquals(target._$PINPOINT$_getAsyncContext(), mockAsyncContext); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void throwableIsNotNull() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + Throwable throwable = new Throwable("ERROR"); + FluxAndMonoOperatorConstructorInterceptor interceptor = new FluxAndMonoOperatorConstructorInterceptor(); + + interceptor.after(target, new Object[]{arg0}, new Object(), throwable); + + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorSubscribeInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorSubscribeInterceptorTest.java new file mode 100644 index 000000000000..24e192401e06 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoOperatorSubscribeInterceptorTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; +import com.navercorp.pinpoint.bootstrap.context.TraceContext; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class FluxAndMonoOperatorSubscribeInterceptorTest { + final TraceContext mockTraceContext = mock(TraceContext.class); + final MethodDescriptor mockMethodDescriptor = mock(MethodDescriptor.class); + final ServiceType mockServiceType = mock(ServiceType.class); + + @Test + public void targetContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorSubscribeInterceptor interceptor = new FluxAndMonoOperatorSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + target._$PINPOINT$_setReactorContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNotNull(arg0._$PINPOINT$_getReactorContext()); + assertEquals(arg0._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void targetContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorSubscribeInterceptor interceptor = new FluxAndMonoOperatorSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + target._$PINPOINT$_setAsyncContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNotNull(asyncContext); + assertNotNull(target._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + assertNotNull(arg0._$PINPOINT$_getReactorContext()); + assertEquals(arg0._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNotNull(asyncContext); + assertEquals(asyncContext, mockAsyncContext); + } + + @Test + public void targetNotContainAsyncContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorSubscribeInterceptor interceptor = new FluxAndMonoOperatorSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Not set asyncContext to target + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + assertNull(arg0._$PINPOINT$_getReactorContext()); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void arg0ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorSubscribeInterceptor interceptor = new FluxAndMonoOperatorSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void arg0NotContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoOperatorSubscribeInterceptor interceptor = new FluxAndMonoOperatorSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Not set asyncContext to target + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + assertNull(arg0._$PINPOINT$_getReactorContext()); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeInterceptorTest.java new file mode 100644 index 000000000000..2d9e52ab0339 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeInterceptorTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; +import com.navercorp.pinpoint.bootstrap.context.TraceContext; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class FluxAndMonoSubscribeInterceptorTest { + final TraceContext mockTraceContext = mock(TraceContext.class); + final MethodDescriptor mockMethodDescriptor = mock(MethodDescriptor.class); + final ServiceType mockServiceType = mock(ServiceType.class); + + @Test + public void targetContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeInterceptor interceptor = new FluxAndMonoSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + target._$PINPOINT$_setReactorContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNotNull(arg0._$PINPOINT$_getReactorContext()); + assertEquals(arg0._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void targetContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeInterceptor interceptor = new FluxAndMonoSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + target._$PINPOINT$_setAsyncContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNotNull(asyncContext); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + assertNotNull(arg0._$PINPOINT$_getReactorContext()); + assertEquals(arg0._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNotNull(asyncContext); + assertEquals(asyncContext, mockAsyncContext); + } + + @Test + public void targetNotContainAsyncContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeInterceptor interceptor = new FluxAndMonoSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Not set asyncContext to target + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + assertNull(arg0._$PINPOINT$_getReactorContext()); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void arg0ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeInterceptor interceptor = new FluxAndMonoSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Set asyncContext to target + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNotNull(target._$PINPOINT$_getReactorContext()); + assertEquals(target._$PINPOINT$_getReactorContext(), mockAsyncContext); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } + + @Test + public void arg0NotContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeInterceptor interceptor = new FluxAndMonoSubscribeInterceptor(mockTraceContext, mockMethodDescriptor, mockServiceType); + + // Not set asyncContext to target + // before + AsyncContext asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}); + + assertNull(asyncContext); + assertNull(target._$PINPOINT$_getAsyncContext()); + assertNull(target._$PINPOINT$_getReactorContext()); + assertNull(arg0._$PINPOINT$_getReactorContext()); + + // after + asyncContext = interceptor.getAsyncContext(target, new Object[]{arg0}, new Object(), null); + + assertNull(asyncContext); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptorTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptorTest.java new file mode 100644 index 000000000000..11382fe9c166 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/FluxAndMonoSubscribeOrReturnInterceptorTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class FluxAndMonoSubscribeOrReturnInterceptorTest { + + @Test + public void targetContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl result = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeOrReturnInterceptor interceptor = new FluxAndMonoSubscribeOrReturnInterceptor(); + + // Set asyncContext to target + target._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, result, null); + + assertNotNull(result._$PINPOINT$_getReactorContext()); + assertEquals(result._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void targetContainAsyncContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl result = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeOrReturnInterceptor interceptor = new FluxAndMonoSubscribeOrReturnInterceptor(); + + // Set asyncContext to target + target._$PINPOINT$_setAsyncContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, result, null); + + assertNotNull(result._$PINPOINT$_getReactorContext()); + assertEquals(result._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void targetNotContainAsyncContext() { + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl result = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeOrReturnInterceptor interceptor = new FluxAndMonoSubscribeOrReturnInterceptor(); + + // Not set asyncContext to target + interceptor.after(target, new Object[]{arg0}, result, null); + + assertNull(result._$PINPOINT$_getReactorContext()); + } + + @Test + public void arg0ContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl result = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeOrReturnInterceptor interceptor = new FluxAndMonoSubscribeOrReturnInterceptor(); + + // Set asyncContext to target + arg0._$PINPOINT$_setReactorContext(mockAsyncContext); + interceptor.after(target, new Object[]{arg0}, result, null); + + assertNotNull(result._$PINPOINT$_getReactorContext()); + assertEquals(result._$PINPOINT$_getReactorContext(), mockAsyncContext); + } + + @Test + public void arg0NotContainReactorContext() { + AsyncContext mockAsyncContext = mock(AsyncContext.class); + MockAsyncContextAndReactorContextImpl target = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl arg0 = new MockAsyncContextAndReactorContextImpl(); + MockAsyncContextAndReactorContextImpl result = new MockAsyncContextAndReactorContextImpl(); + FluxAndMonoSubscribeOrReturnInterceptor interceptor = new FluxAndMonoSubscribeOrReturnInterceptor(); + + // Not set asyncContext to target + interceptor.after(target, new Object[]{arg0}, result, null); + + assertNull(result._$PINPOINT$_getReactorContext()); + } +} \ No newline at end of file diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/MockAsyncContextAndReactorContextImpl.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/MockAsyncContextAndReactorContextImpl.java new file mode 100644 index 000000000000..a5a541ec7c84 --- /dev/null +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/MockAsyncContextAndReactorContextImpl.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.bootstrap.plugin.reactor; + +import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor; +import com.navercorp.pinpoint.bootstrap.context.AsyncContext; + +public class MockAsyncContextAndReactorContextImpl implements AsyncContextAccessor, ReactorContextAccessor { + private AsyncContext asyncContext; + private AsyncContext reactorContext; + + @Override + public void _$PINPOINT$_setAsyncContext(AsyncContext asyncContext) { + this.asyncContext = asyncContext; + } + + @Override + public AsyncContext _$PINPOINT$_getAsyncContext() { + return asyncContext; + } + + @Override + public void _$PINPOINT$_setReactorContext(AsyncContext reactorContext) { + this.reactorContext = reactorContext; + } + + @Override + public AsyncContext _$PINPOINT$_getReactorContext() { + return reactorContext; + } +} diff --git a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/ReactorContextAccessorUtilsTest.java b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/ReactorContextAccessorUtilsTest.java index 28e1ab1c5493..63ed52bf707e 100644 --- a/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/ReactorContextAccessorUtilsTest.java +++ b/bootstraps/bootstrap-core/src/test/java/com/navercorp/pinpoint/bootstrap/plugin/reactor/ReactorContextAccessorUtilsTest.java @@ -16,13 +16,10 @@ package com.navercorp.pinpoint.bootstrap.plugin.reactor; -import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor; -import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils; import com.navercorp.pinpoint.bootstrap.context.AsyncContext; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; diff --git a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java index f5802c62a032..6425e2bffc12 100644 --- a/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java +++ b/plugins/reactor-netty/src/main/java/com/navercorp/pinpoint/plugin/reactor/netty/ReactorNettyPlugin.java @@ -37,6 +37,7 @@ import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoOperatorConstructorInterceptor; import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoOperatorSubscribeInterceptor; import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoSubscribeInterceptor; +import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoSubscribeOrReturnInterceptor; import com.navercorp.pinpoint.bootstrap.plugin.reactor.ReactorContextAccessor; import com.navercorp.pinpoint.common.trace.ServiceType; import com.navercorp.pinpoint.common.util.ArrayUtils; @@ -278,7 +279,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeInterceptor.class, va(ReactorNettyConstants.REACTOR_NETTY_INTERNAL)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } return target.toBytecode(); @@ -306,7 +307,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(FluxAndMonoOperatorSubscribeInterceptor.class, va(ReactorNettyConstants.REACTOR_NETTY_INTERNAL)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } return target.toBytecode(); } diff --git a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java index 5030f72c335d..2b9d7a38ee99 100644 --- a/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java +++ b/plugins/reactor/src/main/java/com/navercorp/pinpoint/plugin/reactor/ReactorPlugin.java @@ -32,6 +32,7 @@ import com.navercorp.pinpoint.bootstrap.plugin.ProfilerPlugin; import com.navercorp.pinpoint.bootstrap.plugin.ProfilerPluginSetupContext; import com.navercorp.pinpoint.bootstrap.plugin.reactor.CoreSubscriberConstructorInterceptor; +import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoSubscribeOrReturnInterceptor; import com.navercorp.pinpoint.bootstrap.plugin.reactor.ReactorContextAccessor; import com.navercorp.pinpoint.common.util.ArrayUtils; import com.navercorp.pinpoint.plugin.reactor.interceptor.ConnectableFluxSubscribeInterceptor; @@ -106,325 +107,325 @@ public void setTransformTemplate(MatchableTransformTemplate transformTemplate) { private void addFlux() { transformTemplate.transform("reactor.core.publisher.Flux", FluxMethodTransform.class); // publishOn - transformTemplate.transform("reactor.core.publisher.FluxPublishOn", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber", RunnableCoreSubscriberTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPublishOn$PublishOnSubscriber", RunnableCoreSubscriberTransform.class); + addFluxOperatorTransform("reactor.core.publisher.FluxPublishOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxPublishOn$PublishOnSubscriber"); // subscribeOn - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOnValue", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOnValue$ScheduledEmpty", RunnableCoreSubscriberTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOnValue$ScheduledScalar", RunnableCoreSubscriberTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOnCallable", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription", RunnableCoreSubscriberTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOn", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber", RunnableCoreSubscriberTransform.class); + addFluxTransform("reactor.core.publisher.FluxSubscribeOnValue"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOnValue$ScheduledEmpty"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOnValue$ScheduledScalar"); + addFluxTransform("reactor.core.publisher.FluxSubscribeOnCallable"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription"); + addFluxOperatorTransform("reactor.core.publisher.FluxSubscribeOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber"); // Flux - transformTemplate.transform("reactor.core.publisher.ConnectableFluxHide", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ConnectableFluxOnAssembly", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ConnectableLift", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ConnectableLiftFuseable", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxArray", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxAutoConnect", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxAutoConnectFuseable", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxBuffer", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxBufferBoundary", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxBufferPredicate", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxBufferTimeout", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxBufferWhen", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxCallable", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxCallableOnAssembly", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxCancelOn", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxCombineLatest", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxConcatArray", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxConcatIterable", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxConcatMap", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxConcatMapNoPrefetch", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxContextWrite", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxCreate", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDefaultIfEmpty", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDefer", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDeferContextual", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDelaySequence", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDelaySubscription", FluxDelaySubscriptionTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDematerialize", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDetach", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDistinct", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDistinctFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDistinctUntilChanged", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoFinally", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoFinallyFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoFirst", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoFirstFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoOnEach", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxDoOnEachFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxElapsed", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxEmpty", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxError", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxErrorOnRequest", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxErrorSupplied", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxExpand", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFilter", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFilterFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFilterWhen", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFirstWithSignal", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFirstWithValue", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFlatMap", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFlattenIterable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxFromMonoOperator", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxGenerate", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxGroupBy", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxGroupJoin", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxHandle", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxHandleFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxHide", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxIndex", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxIndexFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxInterval", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxIterable", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxJoin", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxJust", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxLift", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxLiftFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxLimitRequest", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxLog", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxLogFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMap", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMapFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMapSignal", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMaterialize", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMerge", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMergeComparing", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMergeSequential", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMetrics", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMetricsFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxName", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxNameFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxNever", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnAssembly", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnBackpressureBuffer", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnBackpressureBufferStrategy", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnBackpressureBufferTimeout", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnBackpressureDrop", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnBackpressureLatest", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxOnErrorResume", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPeek", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPeekFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPublish", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPublishMulticast", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRange", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRefCount", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRefCountGrace", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRepeat", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRepeatPredicate", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRepeatWhen", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxReplay", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRetry", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRetryPredicate", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxRetryWhen", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSample", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSampleFirst", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSampleTimeout", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxScan", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxScanSeed", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSkip", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSkipLast", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSkipUntil", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSkipWhile", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSource", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSourceFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSourceMono", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSourceMonoFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxStream", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSwitchIfEmpty", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSwitchMap", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSwitchMapNoPrefetch", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSwitchOnFirst", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTake", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeFuseable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeLast", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeLastOne", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeUntil", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeUntilOther", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTakeWhile", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTimed", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxTimeout", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxUsing", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxUsingWhen", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWindow", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWindowBoundary", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWindowPredicate", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWindowTimeout", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWindowWhen", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxWithLatestFrom", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxZip", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxZipIterable", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.GroupedLift", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.GroupedLiftFuseable", FluxOperatorTransform.class); - - transformTemplate.transform("reactor.core.publisher.FluxFirstEmitting", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxMergeOrdered", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSkipUntilOther", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxContextStart", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxPublishOnCaptured", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.FluxSchedulerCapture", FluxOperatorTransform.class); - - transformTemplate.transform("reactor.core.publisher.ParallelMergeOrdered", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelMergeSequential", FluxOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelMergeSort", FluxOperatorTransform.class); - - transformTemplate.transform("reactor.core.publisher.SinkManyBestEffort", FluxTransform.class); - transformTemplate.transform("reactor.core.publisher.UnicastManySinkNoBackpressure", FluxTransform.class); - - transformTemplate.transform("reactor.core.publisher.InternalConnectableFluxOperator", ConnectableFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.InternalFluxOperator", FluxOperatorTransform.class); + addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxHide"); + addConnectableFluxTransform("reactor.core.publisher.ConnectableFluxOnAssembly"); + addConnectableFluxTransform("reactor.core.publisher.ConnectableLift"); + addConnectableFluxTransform("reactor.core.publisher.ConnectableLiftFuseable"); + addFluxTransform("reactor.core.publisher.FluxArray"); + addConnectableFluxTransform("reactor.core.publisher.FluxAutoConnect"); + addConnectableFluxTransform("reactor.core.publisher.FluxAutoConnectFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxBuffer"); + addFluxOperatorTransform("reactor.core.publisher.FluxBufferBoundary"); + addFluxOperatorTransform("reactor.core.publisher.FluxBufferPredicate"); + addFluxOperatorTransform("reactor.core.publisher.FluxBufferTimeout"); + addFluxOperatorTransform("reactor.core.publisher.FluxBufferWhen"); + addFluxTransform("reactor.core.publisher.FluxCallable"); + addFluxOperatorTransform("reactor.core.publisher.FluxCallableOnAssembly"); + addFluxOperatorTransform("reactor.core.publisher.FluxCancelOn"); + addFluxTransform("reactor.core.publisher.FluxCombineLatest"); + addFluxTransform("reactor.core.publisher.FluxConcatArray"); + addFluxTransform("reactor.core.publisher.FluxConcatIterable"); + addFluxOperatorTransform("reactor.core.publisher.FluxConcatMap"); + addFluxOperatorTransform("reactor.core.publisher.FluxConcatMapNoPrefetch"); + addFluxOperatorTransform("reactor.core.publisher.FluxContextWrite"); + addFluxTransform("reactor.core.publisher.FluxCreate"); + addFluxOperatorTransform("reactor.core.publisher.FluxDefaultIfEmpty"); + addFluxTransform("reactor.core.publisher.FluxDefer"); + addFluxTransform("reactor.core.publisher.FluxDeferContextual"); + addFluxOperatorTransform("reactor.core.publisher.FluxDelaySequence"); + addFluxDelaySubscriptionTransform("reactor.core.publisher.FluxDelaySubscription"); + addFluxOperatorTransform("reactor.core.publisher.FluxDematerialize"); + addFluxOperatorTransform("reactor.core.publisher.FluxDetach"); + addFluxOperatorTransform("reactor.core.publisher.FluxDistinct"); + addFluxOperatorTransform("reactor.core.publisher.FluxDistinctFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxDistinctUntilChanged"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoFinally"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoFinallyFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoFirst"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoFirstFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoOnEach"); + addFluxOperatorTransform("reactor.core.publisher.FluxDoOnEachFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxElapsed"); + addFluxOperatorTransform("reactor.core.publisher.FluxEmpty"); + addFluxTransform("reactor.core.publisher.FluxError"); + addFluxTransform("reactor.core.publisher.FluxErrorOnRequest"); + addFluxTransform("reactor.core.publisher.FluxErrorSupplied"); + addFluxOperatorTransform("reactor.core.publisher.FluxExpand"); + addFluxOperatorTransform("reactor.core.publisher.FluxFilter"); + addFluxOperatorTransform("reactor.core.publisher.FluxFilterFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxFilterWhen"); + addFluxTransform("reactor.core.publisher.FluxFirstWithSignal"); + addFluxTransform("reactor.core.publisher.FluxFirstWithValue"); + addFluxOperatorTransform("reactor.core.publisher.FluxFlatMap"); + addFluxOperatorTransform("reactor.core.publisher.FluxFlattenIterable"); + addFluxOperatorTransform("reactor.core.publisher.FluxFromMonoOperator"); + addFluxTransform("reactor.core.publisher.FluxGenerate"); + addFluxOperatorTransform("reactor.core.publisher.FluxGroupBy"); + addFluxOperatorTransform("reactor.core.publisher.FluxGroupJoin"); + addFluxOperatorTransform("reactor.core.publisher.FluxHandle"); + addFluxOperatorTransform("reactor.core.publisher.FluxHandleFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxHide"); + addFluxOperatorTransform("reactor.core.publisher.FluxIndex"); + addFluxOperatorTransform("reactor.core.publisher.FluxIndexFuseable"); + addFluxTransform("reactor.core.publisher.FluxInterval"); + addFluxTransform("reactor.core.publisher.FluxIterable"); + addFluxOperatorTransform("reactor.core.publisher.FluxJoin"); + addFluxTransform("reactor.core.publisher.FluxJust"); + addFluxOperatorTransform("reactor.core.publisher.FluxLift"); + addFluxOperatorTransform("reactor.core.publisher.FluxLiftFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxLimitRequest"); + addFluxOperatorTransform("reactor.core.publisher.FluxLog"); + addFluxOperatorTransform("reactor.core.publisher.FluxLogFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxMap"); + addFluxOperatorTransform("reactor.core.publisher.FluxMapFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxMapSignal"); + addFluxOperatorTransform("reactor.core.publisher.FluxMaterialize"); + addFluxTransform("reactor.core.publisher.FluxMerge"); + addFluxTransform("reactor.core.publisher.FluxMergeComparing"); + addFluxOperatorTransform("reactor.core.publisher.FluxMergeSequential"); + addFluxOperatorTransform("reactor.core.publisher.FluxMetrics"); + addFluxOperatorTransform("reactor.core.publisher.FluxMetricsFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxName"); + addFluxOperatorTransform("reactor.core.publisher.FluxNameFuseable"); + addFluxTransform("reactor.core.publisher.FluxNever"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnAssembly"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnBackpressureBuffer"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnBackpressureBufferStrategy"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnBackpressureBufferTimeout"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnBackpressureDrop"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnBackpressureLatest"); + addFluxOperatorTransform("reactor.core.publisher.FluxOnErrorResume"); + addFluxOperatorTransform("reactor.core.publisher.FluxPeek"); + addFluxOperatorTransform("reactor.core.publisher.FluxPeekFuseable"); + addConnectableFluxTransform("reactor.core.publisher.FluxPublish"); + addFluxOperatorTransform("reactor.core.publisher.FluxPublishMulticast"); + addFluxTransform("reactor.core.publisher.FluxRange"); + addFluxTransform("reactor.core.publisher.FluxRefCount"); + addFluxTransform("reactor.core.publisher.FluxRefCountGrace"); + addFluxOperatorTransform("reactor.core.publisher.FluxRepeat"); + addFluxOperatorTransform("reactor.core.publisher.FluxRepeatPredicate"); + addFluxOperatorTransform("reactor.core.publisher.FluxRepeatWhen"); + addConnectableFluxTransform("reactor.core.publisher.FluxReplay"); + addFluxOperatorTransform("reactor.core.publisher.FluxRetry"); + addFluxOperatorTransform("reactor.core.publisher.FluxRetryPredicate"); + addFluxOperatorTransform("reactor.core.publisher.FluxRetryWhen"); + addFluxOperatorTransform("reactor.core.publisher.FluxSample"); + addFluxOperatorTransform("reactor.core.publisher.FluxSampleFirst"); + addFluxOperatorTransform("reactor.core.publisher.FluxSampleTimeout"); + addFluxOperatorTransform("reactor.core.publisher.FluxScan"); + addFluxOperatorTransform("reactor.core.publisher.FluxScanSeed"); + addFluxOperatorTransform("reactor.core.publisher.FluxSkip"); + addFluxOperatorTransform("reactor.core.publisher.FluxSkipLast"); + addFluxOperatorTransform("reactor.core.publisher.FluxSkipUntil"); + addFluxOperatorTransform("reactor.core.publisher.FluxSkipWhile"); + addFluxTransform("reactor.core.publisher.FluxSource"); + addFluxOperatorTransform("reactor.core.publisher.FluxSourceFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxSourceMono"); + addFluxOperatorTransform("reactor.core.publisher.FluxSourceMonoFuseable"); + addFluxTransform("reactor.core.publisher.FluxStream"); + addFluxOperatorTransform("reactor.core.publisher.FluxSwitchIfEmpty"); + addFluxOperatorTransform("reactor.core.publisher.FluxSwitchMap"); + addFluxOperatorTransform("reactor.core.publisher.FluxSwitchMapNoPrefetch"); + addFluxOperatorTransform("reactor.core.publisher.FluxSwitchOnFirst"); + addFluxOperatorTransform("reactor.core.publisher.FluxTake"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeFuseable"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeLast"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeLastOne"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeUntil"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeUntilOther"); + addFluxOperatorTransform("reactor.core.publisher.FluxTakeWhile"); + addFluxOperatorTransform("reactor.core.publisher.FluxTimed"); + addFluxOperatorTransform("reactor.core.publisher.FluxTimeout"); + addFluxTransform("reactor.core.publisher.FluxUsing"); + addFluxTransform("reactor.core.publisher.FluxUsingWhen"); + addFluxOperatorTransform("reactor.core.publisher.FluxWindow"); + addFluxOperatorTransform("reactor.core.publisher.FluxWindowBoundary"); + addFluxOperatorTransform("reactor.core.publisher.FluxWindowPredicate"); + addFluxOperatorTransform("reactor.core.publisher.FluxWindowTimeout"); + addFluxOperatorTransform("reactor.core.publisher.FluxWindowWhen"); + addFluxOperatorTransform("reactor.core.publisher.FluxWithLatestFrom"); + addFluxTransform("reactor.core.publisher.FluxZip"); + addFluxOperatorTransform("reactor.core.publisher.FluxZipIterable"); + addFluxTransform("reactor.core.publisher.GroupedLift"); + addFluxOperatorTransform("reactor.core.publisher.GroupedLiftFuseable"); + + addFluxTransform("reactor.core.publisher.FluxFirstEmitting"); + addFluxTransform("reactor.core.publisher.FluxMergeOrdered"); + addFluxOperatorTransform("reactor.core.publisher.FluxSkipUntilOther"); + addFluxOperatorTransform("reactor.core.publisher.FluxContextStart"); + addFluxOperatorTransform("reactor.core.publisher.FluxPublishOnCaptured"); + addFluxOperatorTransform("reactor.core.publisher.FluxSchedulerCapture"); + + addFluxOperatorTransform("reactor.core.publisher.ParallelMergeOrdered"); + addFluxOperatorTransform("reactor.core.publisher.ParallelMergeSequential"); + addFluxOperatorTransform("reactor.core.publisher.ParallelMergeSort"); + + addFluxTransform("reactor.core.publisher.SinkManyBestEffort"); + addFluxTransform("reactor.core.publisher.UnicastManySinkNoBackpressure"); + + addConnectableFluxTransform("reactor.core.publisher.InternalConnectableFluxOperator"); + addFluxOperatorTransform("reactor.core.publisher.InternalFluxOperator"); } private void addMono() { transformTemplate.transform("reactor.core.publisher.Mono", MonoMethodTransform.class); // publishOn - transformTemplate.transform("reactor.core.publisher.MonoPublishOn", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber", RunnableCoreSubscriberTransform.class); + addMonoOperatorTransform("reactor.core.publisher.MonoPublishOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoPublishOn$PublishOnSubscriber"); // subscribeOn - transformTemplate.transform("reactor.core.publisher.MonoSubscribeOnValue", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSubscribeOnCallable", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSubscribeOn", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber", RunnableCoreSubscriberTransform.class); + addMonoTransform("reactor.core.publisher.MonoSubscribeOnValue"); + addMonoTransform("reactor.core.publisher.MonoSubscribeOnCallable"); + addMonoOperatorTransform("reactor.core.publisher.MonoSubscribeOn"); + addRunnableCoreSubscriberTransform("reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber"); // mono - transformTemplate.transform("reactor.core.publisher.MonoAll", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoAny", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCacheInvalidateIf", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCacheInvalidateWhen", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCallable", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCacheTime", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCancelOn", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCollect", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCollectList", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCompletionStage", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoContextWrite", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCount", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCreate", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoCurrentContext", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDefaultIfEmpty", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDefer", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDeferContextual", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDelay", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDelayElement", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDelaySubscription", MonoDelaySubscriptionTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDelayUntil", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDematerialize", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDetach", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDoFinally", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDoFinallyFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFirst", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDoFirstFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDoOnEach", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoDoOnEachFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoElapsed", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoElementAt", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoEmpty", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoError", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoErrorSupplied", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoExpand", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFilter", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFilterFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFilterWhen", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFirstWithSignal", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFirstWithValue", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFlatMap", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFlatMapMany", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFlattenIterable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFromFluxOperator", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoFromPublisher", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoHandle", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoHandleFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoHasElement", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoHasElements", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoHide", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoIgnoreElement", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoIgnoreElements", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoIgnorePublisher", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoIgnoreThen", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoJust", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoLift", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoLiftFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoLog", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoLogFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoMap", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoMapFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoMaterialize", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoMetrics", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoMetricsFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoName", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoNameFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoNever", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoNext", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoOnAssembly", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoOnErrorResume", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoPeek", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoPeekFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoPeekTerminal", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoPublishMulticast", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoReduce", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoReduceSeed", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRepeat", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRepeatPredicate", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRepeatWhen", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRetry", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRetryPredicate", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRetryWhen", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoRunnable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSequenceEqual", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSingle", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSingleCallable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSingleMono", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSource", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSourceFlux", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSourceFluxFuseable", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSourceFuseable", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoStreamCollector", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSupplier", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoSwitchIfEmpty", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoTakeLastOne", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoTakeUntilOther", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoTimed", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoTimeout", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoUsing", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoUsingWhen", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoWhen", MonoTransform.class); - transformTemplate.transform("reactor.core.publisher.MonoZip", MonoTransform.class); - - transformTemplate.transform("reactor.core.publisher.MonoSubscriberContext", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelMergeReduce", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelThen", MonoOperatorTransform.class); - - transformTemplate.transform("reactor.core.publisher.SinkEmptyMulticast", MonoOperatorTransform.class); - transformTemplate.transform("reactor.core.publisher.InternalMonoOperator", MonoOperatorTransform.class); + addMonoOperatorTransform("reactor.core.publisher.MonoAll"); + addMonoOperatorTransform("reactor.core.publisher.MonoAny"); + addMonoOperatorTransform("reactor.core.publisher.MonoCacheInvalidateIf"); + addMonoOperatorTransform("reactor.core.publisher.MonoCacheInvalidateWhen"); + addMonoTransform("reactor.core.publisher.MonoCallable"); + addMonoOperatorTransform("reactor.core.publisher.MonoCacheTime"); + addMonoOperatorTransform("reactor.core.publisher.MonoCancelOn"); + addMonoOperatorTransform("reactor.core.publisher.MonoCollect"); + addMonoOperatorTransform("reactor.core.publisher.MonoCollectList"); + addMonoTransform("reactor.core.publisher.MonoCompletionStage"); + addMonoTransform("reactor.core.publisher.MonoContextWrite"); + addMonoOperatorTransform("reactor.core.publisher.MonoCount"); + addMonoTransform("reactor.core.publisher.MonoCreate"); + addMonoTransform("reactor.core.publisher.MonoCurrentContext"); + addMonoOperatorTransform("reactor.core.publisher.MonoDefaultIfEmpty"); + addMonoTransform("reactor.core.publisher.MonoDefer"); + addMonoTransform("reactor.core.publisher.MonoDeferContextual"); + addMonoTransform("reactor.core.publisher.MonoDelay"); + addMonoOperatorTransform("reactor.core.publisher.MonoDelayElement"); + addMonoDelaySubscriptionTransform("reactor.core.publisher.MonoDelaySubscription"); + addMonoTransform("reactor.core.publisher.MonoDelayUntil"); + addMonoOperatorTransform("reactor.core.publisher.MonoDematerialize"); + addMonoOperatorTransform("reactor.core.publisher.MonoDetach"); + addMonoOperatorTransform("reactor.core.publisher.MonoDoFinally"); + addMonoOperatorTransform("reactor.core.publisher.MonoDoFinallyFuseable"); + addMonoTransform("reactor.core.publisher.MonoFirst"); + addMonoOperatorTransform("reactor.core.publisher.MonoDoFirstFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoDoOnEach"); + addMonoOperatorTransform("reactor.core.publisher.MonoDoOnEachFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoElapsed"); + addMonoOperatorTransform("reactor.core.publisher.MonoElementAt"); + addMonoTransform("reactor.core.publisher.MonoEmpty"); + addMonoTransform("reactor.core.publisher.MonoError"); + addMonoTransform("reactor.core.publisher.MonoErrorSupplied"); + addMonoOperatorTransform("reactor.core.publisher.MonoExpand"); + addMonoOperatorTransform("reactor.core.publisher.MonoFilter"); + addMonoOperatorTransform("reactor.core.publisher.MonoFilterFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoFilterWhen"); + addMonoTransform("reactor.core.publisher.MonoFirstWithSignal"); + addMonoTransform("reactor.core.publisher.MonoFirstWithValue"); + addMonoOperatorTransform("reactor.core.publisher.MonoFlatMap"); + addMonoOperatorTransform("reactor.core.publisher.MonoFlatMapMany"); + addMonoOperatorTransform("reactor.core.publisher.MonoFlattenIterable"); + addMonoTransform("reactor.core.publisher.MonoFromFluxOperator"); + addMonoOperatorTransform("reactor.core.publisher.MonoFromPublisher"); + addMonoOperatorTransform("reactor.core.publisher.MonoHandle"); + addMonoOperatorTransform("reactor.core.publisher.MonoHandleFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoHasElement"); + addMonoOperatorTransform("reactor.core.publisher.MonoHasElements"); + addMonoOperatorTransform("reactor.core.publisher.MonoHide"); + addMonoOperatorTransform("reactor.core.publisher.MonoIgnoreElement"); + addMonoOperatorTransform("reactor.core.publisher.MonoIgnoreElements"); + addMonoOperatorTransform("reactor.core.publisher.MonoIgnorePublisher"); + addMonoTransform("reactor.core.publisher.MonoIgnoreThen"); + addMonoTransform("reactor.core.publisher.MonoJust"); + addMonoOperatorTransform("reactor.core.publisher.MonoLift"); + addMonoOperatorTransform("reactor.core.publisher.MonoLiftFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoLog"); + addMonoOperatorTransform("reactor.core.publisher.MonoLogFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoMap"); + addMonoOperatorTransform("reactor.core.publisher.MonoMapFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoMaterialize"); + addMonoOperatorTransform("reactor.core.publisher.MonoMetrics"); + addMonoOperatorTransform("reactor.core.publisher.MonoMetricsFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoName"); + addMonoOperatorTransform("reactor.core.publisher.MonoNameFuseable"); + addMonoTransform("reactor.core.publisher.MonoNever"); + addMonoOperatorTransform("reactor.core.publisher.MonoNext"); + addMonoOperatorTransform("reactor.core.publisher.MonoOnAssembly"); + addMonoOperatorTransform("reactor.core.publisher.MonoOnErrorResume"); + addMonoOperatorTransform("reactor.core.publisher.MonoPeek"); + addMonoOperatorTransform("reactor.core.publisher.MonoPeekFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoPeekTerminal"); + addMonoOperatorTransform("reactor.core.publisher.MonoPublishMulticast"); + addMonoOperatorTransform("reactor.core.publisher.MonoReduce"); + addMonoOperatorTransform("reactor.core.publisher.MonoReduceSeed"); + addMonoOperatorTransform("reactor.core.publisher.MonoRepeat"); + addMonoOperatorTransform("reactor.core.publisher.MonoRepeatPredicate"); + addMonoOperatorTransform("reactor.core.publisher.MonoRepeatWhen"); + addMonoOperatorTransform("reactor.core.publisher.MonoRetry"); + addMonoOperatorTransform("reactor.core.publisher.MonoRetryPredicate"); + addMonoOperatorTransform("reactor.core.publisher.MonoRetryWhen"); + addMonoOperatorTransform("reactor.core.publisher.MonoRunnable"); + addMonoTransform("reactor.core.publisher.MonoSequenceEqual"); + addMonoOperatorTransform("reactor.core.publisher.MonoSingle"); + addMonoOperatorTransform("reactor.core.publisher.MonoSingleCallable"); + addMonoOperatorTransform("reactor.core.publisher.MonoSingleMono"); + addMonoTransform("reactor.core.publisher.MonoSource"); + addMonoOperatorTransform("reactor.core.publisher.MonoSourceFlux"); + addMonoOperatorTransform("reactor.core.publisher.MonoSourceFluxFuseable"); + addMonoTransform("reactor.core.publisher.MonoSourceFuseable"); + addMonoOperatorTransform("reactor.core.publisher.MonoStreamCollector"); + addMonoTransform("reactor.core.publisher.MonoSupplier"); + addMonoOperatorTransform("reactor.core.publisher.MonoSwitchIfEmpty"); + addMonoOperatorTransform("reactor.core.publisher.MonoTakeLastOne"); + addMonoOperatorTransform("reactor.core.publisher.MonoTakeUntilOther"); + addMonoOperatorTransform("reactor.core.publisher.MonoTimed"); + addMonoOperatorTransform("reactor.core.publisher.MonoTimeout"); + addMonoTransform("reactor.core.publisher.MonoUsing"); + addMonoTransform("reactor.core.publisher.MonoUsingWhen"); + addMonoTransform("reactor.core.publisher.MonoWhen"); + addMonoTransform("reactor.core.publisher.MonoZip"); + + addMonoOperatorTransform("reactor.core.publisher.MonoSubscriberContext"); + addMonoOperatorTransform("reactor.core.publisher.ParallelMergeReduce"); + addMonoOperatorTransform("reactor.core.publisher.ParallelThen"); + + addMonoOperatorTransform("reactor.core.publisher.SinkEmptyMulticast"); + addMonoOperatorTransform("reactor.core.publisher.InternalMonoOperator"); } private void addParallelFlux() { transformTemplate.transform("reactor.core.publisher.ParallelFlux", ParallelFluxMethodTransform.class); // runOn - transformTemplate.transform("reactor.core.publisher.ParallelRunOn", ParallelFluxTransform.class); - - transformTemplate.transform("reactor.core.publisher.ParallelArraySource", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelCollect", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelConcatMap", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelDoOnEach", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelFilter", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelFlatMap", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelFluxHide", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelFluxName", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelFluxOnAssembly", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelGroup", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelLift", ParallelFluxTransform.class); - - transformTemplate.transform("reactor.core.publisher.ParallelLiftFuseable", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelLog", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelMap", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelPeek", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelReduceSeed", ParallelFluxTransform.class); - transformTemplate.transform("reactor.core.publisher.ParallelSource", ParallelFluxTransform.class); + addParallelFluxTransform("reactor.core.publisher.ParallelRunOn"); + + addParallelFluxTransform("reactor.core.publisher.ParallelArraySource"); + addParallelFluxTransform("reactor.core.publisher.ParallelCollect"); + addParallelFluxTransform("reactor.core.publisher.ParallelConcatMap"); + addParallelFluxTransform("reactor.core.publisher.ParallelDoOnEach"); + addParallelFluxTransform("reactor.core.publisher.ParallelFilter"); + addParallelFluxTransform("reactor.core.publisher.ParallelFlatMap"); + addParallelFluxTransform("reactor.core.publisher.ParallelFluxHide"); + addParallelFluxTransform("reactor.core.publisher.ParallelFluxName"); + addParallelFluxTransform("reactor.core.publisher.ParallelFluxOnAssembly"); + addParallelFluxTransform("reactor.core.publisher.ParallelGroup"); + addParallelFluxTransform("reactor.core.publisher.ParallelLift"); + + addParallelFluxTransform("reactor.core.publisher.ParallelLiftFuseable"); + addParallelFluxTransform("reactor.core.publisher.ParallelLog"); + addParallelFluxTransform("reactor.core.publisher.ParallelMap"); + addParallelFluxTransform("reactor.core.publisher.ParallelPeek"); + addParallelFluxTransform("reactor.core.publisher.ParallelReduceSeed"); + addParallelFluxTransform("reactor.core.publisher.ParallelSource"); } private void addCoreSubscriber() { @@ -461,7 +462,12 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + void addFluxTransform(String className) { + transformTemplate.transform(className, FluxTransform.class); + } + public static class FluxTransform implements TransformCallback { + @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { final InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer); @@ -483,13 +489,17 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(FluxSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } return target.toBytecode(); } } + void addFluxOperatorTransform(String className) { + transformTemplate.transform(className, FluxOperatorTransform.class); + } + public static class FluxOperatorTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -512,14 +522,19 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(FluxOperatorSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } return target.toBytecode(); } } + void addFluxDelaySubscriptionTransform(String className) { + transformTemplate.transform(className, FluxDelaySubscriptionTransform.class); + } + public static class FluxDelaySubscriptionTransform implements TransformCallback { + @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { final InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer); @@ -541,7 +556,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(FluxDelaySubscriptionSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } final InstrumentMethod acceptMethod = target.getDeclaredMethod("accept", "reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber"); if (acceptMethod != null) { @@ -552,6 +567,10 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + void addRunnableCoreSubscriberTransform(String className) { + transformTemplate.transform(className, RunnableCoreSubscriberTransform.class); + } + public static class RunnableCoreSubscriberTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -592,6 +611,10 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + void addMonoTransform(String className) { + transformTemplate.transform(className, MonoTransform.class); + } + public static class MonoTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -613,12 +636,17 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(MonoSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } + return target.toBytecode(); } } + void addMonoOperatorTransform(String className) { + transformTemplate.transform(className, MonoOperatorTransform.class); + } + public static class MonoOperatorTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -640,12 +668,17 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(MonoOperatorSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } + return target.toBytecode(); } } + void addMonoDelaySubscriptionTransform(String className) { + transformTemplate.transform(className, MonoDelaySubscriptionTransform.class); + } + public static class MonoDelaySubscriptionTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -668,7 +701,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin // since 3.3.0 final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(MonoDelaySubscriptionSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); + subscribeOrReturnMethod.addInterceptor(FluxAndMonoSubscribeOrReturnInterceptor.class); } final InstrumentMethod acceptMethod = target.getDeclaredMethod("accept", "reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber"); if (acceptMethod != null) { @@ -692,6 +725,10 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + void addParallelFluxTransform(String className) { + transformTemplate.transform(className, ParallelFluxTransform.class); + } + public static class ParallelFluxTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -732,6 +769,10 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin } } + void addConnectableFluxTransform(String className) { + transformTemplate.transform(className, ConnectableFluxTransform.class); + } + public static class ConnectableFluxTransform implements TransformCallback { @Override public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { @@ -751,11 +792,6 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin if (subscribeMethod != null) { subscribeMethod.addInterceptor(ConnectableFluxSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); } - // since 3.3.0 - final InstrumentMethod subscribeOrReturnMethod = target.getDeclaredMethod("subscribeOrReturn", "reactor.core.CoreSubscriber"); - if (subscribeOrReturnMethod != null) { - subscribeOrReturnMethod.addInterceptor(ConnectableFluxSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY)); - } final InstrumentMethod connectMethod = target.getDeclaredMethod("connect", "java.util.function.Consumer"); if (connectMethod != null) { connectMethod.addInterceptor(ConnectableFluxSubscribeInterceptor.class, va(ReactorConstants.REACTOR_NETTY));