Class SessionService

java.lang.Object
org.apache.beam.sdk.io.solace.broker.SessionService
All Implemented Interfaces:
Serializable
Direct Known Subclasses:
JcsmpSessionService

public abstract class SessionService extends Object implements Serializable
The SessionService interface provides a set of methods for managing a session with the Solace messaging system. It allows for establishing a connection, creating a message-receiver object, checking if the connection is closed or not, and gracefully closing the session.

Override this class and the method getSessionProperties() with your specific properties, including all those related to authentication.

The connector will call the method only once per session created, so you can perform relatively heavy operations in that method (e.g. connect to a store or vault to retrieve credentials).

There are some default properties that are set by default and can be overridden in this provider, that are relevant for the writer connector, and not used in the case of the read connector (since they are not necessary for reading):

  • VPN_NAME: default
  • GENERATE_SEND_TIMESTAMPS: true
  • PUB_MULTI_THREAD: true

The connector overrides other properties, regardless of what this provider sends to the connector. Those properties are the following. Again, these properties are only relevant for the write connector.

  • PUB_ACK_WINDOW_SIZE
  • MESSAGE_CALLBACK_ON_REACTOR
Those properties are set by the connector based on the values of SolaceIO.Write.withWriterType(SolaceIO.WriterType) and SolaceIO.Write.withSubmissionMode(SolaceIO.SubmissionMode).

The method will always run in a worker thread or task, and not in the driver program. If you need to access any resource to set the properties, you need to make sure that the worker has the network connectivity required for that, and that any credential or configuration is passed to the provider through the constructor.

The connector ensures that no two threads will be calling that method at the same time, so you don't have to take any specific precautions to avoid race conditions.

For basic authentication, use BasicAuthJcsmpSessionServiceFactory.

For other situations, you need to extend this class and implement the `equals` method, so two instances of your class can be compared by value. We recommend using AutoValue for that. For instance:


 {@literal }@AutoValue
 public class MySessionService extends SessionService {
   abstract String authToken();

   public static MySessionService create(String authToken) {
       return new AutoValue_MySessionService(authToken);
   }

   {@literal }@Override
   public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
     baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken());
     return props;
   }

   {@literal }@Override
   public void connect() {
       ...
   }

   ...
 }
 
See Also:
  • Field Details

  • Constructor Details

    • SessionService

      public SessionService()
  • Method Details

    • connect

      public abstract void connect()
      Establishes a connection to the service. This could involve providing connection details like host, port, VPN name, username, and password.
    • close

      public abstract void close()
      Gracefully closes the connection to the service.
    • getReceiver

      public abstract MessageReceiver getReceiver()
      Returns a MessageReceiver object for receiving messages from Solace. If it is the first time this method is used, the receiver is created from the session instance, otherwise it returns the receiver created initially.
    • getInitializedProducer

      public abstract MessageProducer getInitializedProducer(SolaceIO.SubmissionMode mode)
      Returns a MessageProducer object for publishing messages to Solace. If it is the first time this method is used, the producer is created from the session instance, otherwise it returns the producer created initially.
    • getPublishedResultsQueue

      public abstract Queue<Solace.PublishResult> getPublishedResultsQueue()
      Returns the Queue<Solace.PublishResult> instance associated with this session, with the asynchronously received callbacks from Solace for message publications. The queue implementation has to be thread-safe for production use-cases.
    • getSessionProperties

      public abstract JCSMPProperties getSessionProperties()
      Override this method and provide your specific properties, including all those related to authentication, and possibly others too.

      The method will be used whenever the session needs to be created or refreshed. If you are setting credentials with expiration, just make sure that the latest available credentials (e.g. renewed token) are set when the method is called.

      For a list of all the properties that can be set, please check the following link:

    • equals

      public abstract boolean equals(@Nullable Object other)
      You need to override this method to be able to compare these objects by value. We recommend using AutoValue for that.
      Overrides:
      equals in class Object
    • hashCode

      public abstract int hashCode()
      You need to override this method to be able to compare these objects by value. We recommend using AutoValue for that.
      Overrides:
      hashCode in class Object
    • initializeWriteSessionProperties

      public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode)
      This method will be called by the write connector when a new session is started.

      This call will happen in the worker, so you need to make sure that the worker has access to the resources you need to set the properties.

      The call will happen only once per session initialization. Typically, that will be when the worker and the client are created. But if for any reason the session is lost (e.g. expired auth token), this method will be called again.