Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Reciever is not type casting the object as expected #3102

Open
sunilbm opened this issue Feb 10, 2025 · 4 comments
Open

Stream Reciever is not type casting the object as expected #3102

sunilbm opened this issue Feb 10, 2025 · 4 comments
Assignees
Labels
status: waiting-for-feedback We need additional information before we can continue

Comments

@sunilbm
Copy link

sunilbm commented Feb 10, 2025

I have a custom DTO User:

{
   "event_ts" : 1658940994914,
   "event": "session.started",
    "name" : "",
    "email" : "",
    "address" : {
       "state":"",
       "city": ""
}
}

Redis template is built in the following manner :


       Jackson2JsonRedisSerializer<User> serializer = new Jackson2JsonRedisSerializer<>(
            CustomObjectMapper, User.class);
        final RedisSerializationContextBuilder<String, User> builder = RedisSerializationContext
            .newSerializationContext(new StringRedisSerializer());

        RedisSerializationContext<String, User> serializationContext = builder
                .value(serializer)
                .hashValue(serializer)
                .build();

Data to redis stream is sent with the help of this code

    public Mono<RecordId> sendWithRetry(final T record, final String streamName) {
        final ObjectRecord<String, T> payload = StreamRecords.newRecord()
                .ofObject(record)
                .withStreamKey(streamName);

        return this.redisTemplate.opsForStream(new Jackson2HashMapper(JsonUtils.allocateDefaultObjectMapper(), true))
            .add(payload);
    }

At the receiver end we have configured a Stream Receiver as shown below :

StreamReceiver.StreamReceiverOptions
            .builder()
            .batchSize(100)
            .pollTimeout(Duration.ofSeconds(0))
            .objectMapper(new Jackson2HashMapper(JsonUtils.allocateDefaultObjectMapper(), true))
            .targetType(User.class)
            .build();

And the receiver code is written as follow :


final Consumer consumer = Consumer.from(consumerGroupName(), consumerName());
        final StreamOffset<String> streamOffset = StreamOffset.create(streamName(), ReadOffset.lastConsumed());

        final Flux<ObjectRecord<String, User>> streamListener = this.streamReceiver.receive(consumer, streamOffset);

        streamListener
            .repeatWhen(longFlux -> Flux.interval(Duration.ofMillis(100)))
            .doOnNext(record -> {
                getLog().debug("Received message from the stream [{}] with ID [{}} and message [{}]",
                    record.getStream(), record.getId(), record.getValue());
                    return;
            })
           .subscribe();

I have two issues to report :

  1. Even though I have defined the target type in stream receiver, the record received is in the form of linked hash map.
  2. If I enable the debugger , the values in the map are wrapped with the additional double quotes. For example : if my data has an attribute "event": "session.started", the read value looks like "event": ""session.started"". I thought of converting the Map returned to an object explicitly with the help of an object mapper but the double quotes is causing trouble.

I have attached the screen shot of how the data is returned.

Image

can someone help me with this. I have tested this on 3.4.2 and 3.4.0 and is reproducible in both the version

@christophstrobl
Copy link
Member

Thank you for reporting. It seems you already have some code that can reproduce the issue. It would help a lot if you could package it up and provide a complete minimal sample (something that we can unzip or git clone, build, and deploy) that reproduces the problem.

@christophstrobl christophstrobl added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-triage An issue we've not yet triaged labels Feb 11, 2025
@sunilbm
Copy link
Author

sunilbm commented Feb 13, 2025

@christophstrobl
The code snippets shared were extracted from a bigger project. I will try to extract these snippets to a separate project , so that you can work on it.
Will try to share this by this weekend , hope that works for you.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Feb 13, 2025
@sunilbm
Copy link
Author

sunilbm commented Feb 15, 2025

Archive.zip

@christophstrobl
I have attached a zip with the code. Let me know if you need any other details.

@christophstrobl
Copy link
Member

Thank you for the reproducer unfortunately this one contains a lot of custom moving parts including custom jackson configuration, different serializations, that makes it hard to spot the problem in reasonable time.

I think it would make sense to have an isolated view on the two problems.

From what I saw, the ObjectMapper provided to the Jackson2HashMapper does not do any default typing so it is not able to reason about the type when deserializing, thus falling back to a Map structure.

Jackson2HashMapper hashMapper = new Jackson2HashMapper(JsonUtils.allocateDefaultObjectMapper(), false);
Map<String, Object> hash = hashMapper.toHash(TestData.USER_DTO);
Object fromHash = hashMapper.fromHash(hash); <-- fromHash will be a LinkedHashMap

@christophstrobl christophstrobl added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Feb 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-feedback We need additional information before we can continue
Projects
None yet
Development

No branches or pull requests

3 participants