Ciaran0 February 2016

Akka-Http Websockets: How to Send consumers the same stream of data

I have a WebSocket that clients can connect to I also have a stream of data using akka-streams. How can I make it that all clients get the same data. At the moment they seem to be racing for the data.

Thanks

Answers


Al Iacovella February 2016

One way you could do is is to have an actor that extends ActorPublisher and have it subscribe to some message.

class MyPublisher extends ActorPublisher[MyData]{

  override def preStart = {
    context.system.eventStream.subscribe(self, classOf[MyData])
  }

  override def receive: Receive = {

    case msg: MyData ⇒
      if (isActive && totalDemand > 0) {
        // Pushes the message onto the stream
        onNext(msg)
      }
  }
}

object MyPublisher {
  def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}

case class MyData(data:String)

You can then use that actor as the source for the stream:

val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))

You can then create a flow from that datasource and apply a transform to convert the data into a websocket message

val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})

Then you can use that flow in your route handling.

path("readings") {
  handleWebsocketMessages(myFlow)
} 

From the processing of the original stream, you can then publish the data to the event stream and any instance of that actor will pick it up and put in onto the stream that their websocket is being served from.

  val actorSystem = ActorSystem("foo")

  val otherSource = Source.fromIterator(()  => List(MyData("a"), MyData("b")).iterator)

  otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}

Each socket will then have its own instance of the actor to provide it with data all coming from a single source.

Post Status

Asked in February 2016
Viewed 2,993 times
Voted 8
Answered 1 times

Search




Leave an answer