Spring Cloud GCP Pubsub Consumer Deserialization

3 minute read Published:

Having issues deserializing pubsub message with Spring Cloud?

Lately I’ve been working on a project where we’re consuming messages from a Google PubSub topic. Whenever it comes to consuming or producing data from middleware typically Spring Cloud Stream is my go to framework and project. The special thing about SCS is that you can change binders, or middleware rather, at will. While it may seem unlikely that you’ll ever be switching middleware on a regular basis, it does indeed happen. One example that always comes to mind is where I was working with a client who was convinced that they wanted to use Confluent Kafka as their middleware, only to find out that Google was giving kickbacks for using their service so they wanted to migrate all of there apps to use PubSub instead. With SCS it was just a matter of including the correct dependencies and modifying some configuration. The migration itself for a single app only took about 20 or so minutes, which in and of itself is pretty amazing.

I’ll quit fanboying though about the project and get on to the real purpose of this. Lately when we moved to the latest version of Spring Cloud Stream (3.x.x) we noticed that the new programming model was being utilized. Instead of having annotated methods with @StreamListener and @SendTo they want their users to incorporate the functional model.

The problem we ran into was when we were trying to initialize the input type of the function. What is it? A PubsubMessage or TransferOperation? The solution we found was that a PubsubMessage was being unwrapped into a Spring Message. The PubsubMessage attributes were being copied into the header of the Message while the TransferOperation was in the payload. This allowed us to define our function as follows.

 0// Kotlin function
 1fun notification(): Function<Message<TransferOperation?>, CustomNotification> {
 2    return Function { message: Message<TransferOperation?> ->
 3        val headers = message.headers
 4
 5        CustomNotification.newBuilder()
 6            .setProjectId(headers["projectId"].toString())
 7            .setEventType(headers["eventType"].toString())
 8            .setTransferJobName(headers["transferJobName"].toString())
 9            .setTransferOperationName(headers["transferOperationName"].toString())
10            .build()
11    }
12}

However we still ran into an issue with the framework deserializing the data into the TransferOperation model provided by Google. To fix this we’re able to configure our own message converter.

 0class GoogleJsonMessageConverter : AbstractMessageConverter(MimeType("application", "json")) {
 1
 2    private val defaultInstance = JacksonFactory.getDefaultInstance()
 3
 4    public override fun supports(clazz: Class<*>): Boolean {
 5        return TransferOperation::class.java == clazz
 6    }
 7
 8    public override fun convertFromInternal(
 9        message: Message<*>, targetClass: Class<*>, @Nullable conversionHint: Any?
10    ): Any? {
11        val payload: ByteArray = message.getPayload() as ByteArray
12        return defaultInstance.fromString(String(payload), TransferOperation::class.java)
13    }
14}

Note that the highlighted line in the above block references the JacksonFactory that is provided by Google specifically to deserialize Google model objects included in their API libraries. The JacksonFactory used in the above code block is specifically provided by com.google.http-client:google-http-client-jackson2.

The above class is then returned by a configuration bean below

0@Configuration
1class ServiceNotificationConfiguration {
2    @Bean
3    fun customMessageConverter(): MessageConverter? {
4        return GoogleJsonMessageConverter()
5    }
6}

Essentially what happens is that when the function is triggered, the framework detects that it is of MimeType application/json and uses this custom deserializer to deserialize the data and bind to a TransferOperation object.

The issue was none of this was really laid out in the Spring Cloud String Pubsub Binder documentation clearly so we had to go through a bit of pain to figure it out. In any case, it’s provided here for you if you run into the same issue, enjoy!