Skip to content

Commit

Permalink
- add redis password to red5 apps
Browse files Browse the repository at this point in the history
  • Loading branch information
ritzalam committed Apr 29, 2019
1 parent 1a39ecd commit 0bd6b21
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ class MeetingActor(
val elapsedInMin = TimeUtil.millisToMinutes(elapsedInMs)

if (props.recordProp.record &&
recordingChapterBreakLengthInMinutes > 0 &&
recordingChapterBreakLengthInMinutes > 0 &&
elapsedInMin > recordingChapterBreakLengthInMinutes) {
lastRecBreakSentOn = now
val event = MsgBuilder.buildRecordingChapterBreakSysMsg(props.meetingProp.intId, TimeUtil.timeNowInMs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public void setPassword(String password) {
}

protected void connectionStatusHandler(Event event, Logger log) {
System.out.println("******** RedisAwareCommunicator - " + event);
//System.out.println("******** RedisAwareCommunicator - " + event);

if (event instanceof ConnectedEvent) {
log.info("Connected to redis");
log.info("Connected to redis. clientName="+ clientName);
} else if (event instanceof ConnectionActivatedEvent) {
log.info("Connected to redis activated");
log.info("Connected to redis activated. clientName="+ clientName);
} else if (event instanceof DisconnectedEvent) {
log.info("Disconnected from redis");
log.info("Disconnected from redis. clientName="+ clientName);
} else if (event instanceof ConnectionDeactivatedEvent) {
log.info("Connected to redis deactivated");
log.info("Connected to redis deactivated. clientName="+ clientName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ case class RedisConfig(host: String, port: Int, password: Option[String], expire

trait RedisConnectionHandler {

val clientName: String

def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) {
val eventBus: EventBus = redis.getResources().eventBus()
// @todo : unsubscribe when connection is closed
Expand All @@ -21,13 +23,13 @@ trait RedisConnectionHandler {
println("******* RedisConnectionHandler - " + event)

if (event.isInstanceOf[ConnectedEvent]) {
log.info("Connected to redis")
log.info("Connected to redis. clientName=" + clientName)
} else if (event.isInstanceOf[ConnectionActivatedEvent]) {
log.info("Connection to redis activated")
log.info("Connection to redis activated. clientName=" + clientName)
} else if (event.isInstanceOf[DisconnectedEvent]) {
log.info("Disconnected from redis")
log.info("Disconnected from redis. clientName=" + clientName)
} else if (event.isInstanceOf[ConnectionDeactivatedEvent]) {
log.info("Connection to redis deactivated")
log.info("Connection to redis deactivated. clientName=" + clientName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@
*/
package org.bigbluebutton.app.screenshare

import scala.util.{ Failure, Success }
import akka.util.Timeout
import akka.pattern.ask
import org.bigbluebutton.app.screenshare.events.IEventsMessageBus
import org.bigbluebutton.app.screenshare.server.sessions.ScreenshareManager
import org.bigbluebutton.app.screenshare.server.sessions.messages._
import org.bigbluebutton.app.screenshare.server.util.LogHelper
import akka.actor.ActorSystem
import org.bigbluebutton.app.screenshare.redis.{ ScreenshareRedisSubscriberActor, ReceivedJsonMsgHandlerActor }
import org.bigbluebutton.app.screenshare.redis.{ ReceivedJsonMsgHandlerActor, ScreenshareRedisSubscriberActor }

import scala.concurrent.{ Await, Future, TimeoutException }
import scala.concurrent.duration._
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus

class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
val streamBaseUrl: String) extends IScreenShareApplication
with SystemConfiguration { //} with LogHelper {
import org.bigbluebutton.common2.redis.RedisConfig

class ScreenShareApplication(
val bus: IEventsMessageBus,
val jnlpFile: String,
val streamBaseUrl: String,
redisHost: String,
redisPort: Int,
redisPass: String,
redisExpireKey: Int) extends IScreenShareApplication
with SystemConfiguration { //with LogHelper {

implicit val system = ActorSystem("bbb-screenshare-system")
implicit val timeout = akka.util.Timeout(3 seconds)
Expand All @@ -48,10 +52,19 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,

// Need to wrap redisPassword into Option as it may be
// null (ralam nov 29, 2018)
val redisPass = Option(redisPassword)

val redisPassword = Option(redisPass)
val redisConfig = RedisConfig(redisHost, redisPort, redisPassword, redisExpireKey)
val channelsToSubscribe = Seq(fromAkkaAppsRedisChannel)
val patternsToSubscribe = Nil
val incomingJsonMessageBus = new IncomingJsonMessageBus
val redisSubscriberActor = system.actorOf(ScreenshareRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber")
val redisSubscriberActor = system.actorOf(
ScreenshareRedisSubscriberActor.props(
system,
incomingJsonMessageBus,
redisConfig,
channelsToSubscribe,
patternsToSubscribe,
toScreenshareAppsJsonChannel), "redis-subscriber")

val screenShareManager = system.actorOf(ScreenshareManager.props(system, bus), "screenshare-manager")

Expand All @@ -65,17 +78,13 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
//}

def meetingHasEnded(meetingId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received meetingHasEnded on meeting=" + meetingId + "]")
// }
//logger.info("Received meetingHasEnded on meeting=" + meetingId + "]")

screenShareManager ! new MeetingEnded(meetingId)
}

def meetingCreated(meetingId: String, record: java.lang.Boolean) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received meetingCreated on meeting=" + meetingId + "]")
// }
//logger.debug("Received meetingCreated on meeting=" + meetingId + "]")

screenShareManager ! new MeetingCreated(meetingId, record)

Expand Down Expand Up @@ -119,9 +128,7 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
new ScreenShareInfoResponse(info, null)
} catch {
case e: TimeoutException =>
// if (logger.isDebugEnabled()) {
// logger.debug("FAILED to get screen share info on meetingId=" + meetingId + "]")
// }
//logger.warn("FAILED to get screen share info on meetingId=" + meetingId + "]")
new ScreenShareInfoResponse(null, initError)
}

Expand All @@ -139,18 +146,15 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
}
} catch {
case e: TimeoutException =>
// if (logger.isDebugEnabled()) {
// logger.debug("FAILED to get sharing status on stream=" + streamId + "]")
// }
//logger.warn("FAILED to get sharing status on stream=" + streamId + "]")

new SharingStatus("STOP", null)
}

}

def recordStream(meetingId: String, streamId: String): java.lang.Boolean = {
// if (logger.isDebugEnabled()) {
// logger.debug("Received record stream request on stream=" + streamId + "]")
// }
//logger.info("Received record stream request on stream=" + streamId + "]")

var record = false

Expand All @@ -174,50 +178,50 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
}

def requestShareToken(meetingId: String, userId: String, record: java.lang.Boolean, tunnel: java.lang.Boolean) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received request share token on meeting=" + meetingId + "for user=" + userId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received request share token on meeting=" + meetingId + "for user=" + userId + "]")
//}

screenShareManager ! RequestShareTokenMessage(meetingId, userId, jnlpFile, record, tunnel)
}

def startShareRequest(meetingId: String, userId: String, session: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received start share request on meeting=" + meetingId + "for user=" + userId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received start share request on meeting=" + meetingId + "for user=" + userId + "]")
//}

screenShareManager ! StartShareRequestMessage(meetingId, userId, session)
}

def restartShareRequest(meetingId: String, userId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received restart share request on meeting=[" + meetingId
// + "] from userId=[" + userId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received restart share request on meeting=[" + meetingId
// + "] from userId=[" + userId + "]")
//}
screenShareManager ! new RestartShareRequestMessage(meetingId, userId)
}

def pauseShareRequest(meetingId: String, userId: String, streamId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received pause share request on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received pause share request on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
//}
screenShareManager ! new PauseShareRequestMessage(meetingId, userId, streamId)
}

def stopShareRequest(meetingId: String, streamId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received stop share request on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received stop share request on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
screenShareManager ! new StopShareRequestMessage(meetingId, streamId)
}

def streamStarted(meetingId: String, streamId: String, url: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received stream started on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received stream started on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
//}
screenShareManager ! new StreamStartedMessage(meetingId, streamId, url)
}

Expand All @@ -227,26 +231,26 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String,
}

def streamStopped(meetingId: String, streamId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received stream stopped on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received stream stopped on meeting=[" + meetingId
// + "] for stream=[" + streamId + "]")
// }
screenShareManager ! new StreamStoppedMessage(meetingId, streamId)
}

def sharingStarted(meetingId: String, streamId: String, width: java.lang.Integer, height: java.lang.Integer) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received share started on meeting=[" + meetingId
// + "] for stream=[" + streamId + "] with region=[" + width + "x" + height + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received share started on meeting=[" + meetingId
// + "] for stream=[" + streamId + "] with region=[" + width + "x" + height + "]")
//}
screenShareManager ! new SharingStartedMessage(meetingId, streamId, width, height)
}

def sharingStopped(meetingId: String, streamId: String) {
// if (logger.isDebugEnabled()) {
// logger.debug("Received sharing stopped on meeting=" + meetingId
// + "for stream=" + streamId + "]")
// }
//if (logger.isDebugEnabled()) {
// logger.debug("Received sharing stopped on meeting=" + meetingId
// + "for stream=" + streamId + "]")
//}
screenShareManager ! new SharingStoppedMessage(meetingId, streamId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package org.bigbluebutton.app.screenshare

import scala.util.Try
import com.typesafe.config.ConfigFactory
import org.bigbluebutton.common2.redis.RedisConfiguration

trait SystemConfiguration extends RedisConfiguration {
trait SystemConfiguration {
val config = ConfigFactory.load()

lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel")
lazy val fromAkkaAppsRedisChannel = Try(config.getString("redis.fromAkkaAppsRedisChannel")).getOrElse("from-akka-apps-redis-channel")

lazy val meetingManagerChannel = Try(config.getString("eventBus.meetingManagerChannel")).getOrElse("NOT FROM APP CONF")

lazy val toScreenshareAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-screenshare-apps-json-channel")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,65 @@
package org.bigbluebutton.app.screenshare.redis

import org.bigbluebutton.app.screenshare.SystemConfiguration
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import org.bigbluebutton.common2.redis.RedisSubscriberProvider

import org.bigbluebutton.common2.bus._
import org.bigbluebutton.common2.redis.{ RedisConfig, RedisSubscriberProvider }
import akka.actor.ActorSystem
import akka.actor.Props
import io.lettuce.core.pubsub.RedisPubSubListener

object ScreenshareRedisSubscriberActor extends SystemConfiguration {

val channels = Seq(fromAkkaAppsRedisChannel)
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*")
object ScreenshareRedisSubscriberActor {

def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props =
def props(
system: ActorSystem,
jsonMsgBus: IncomingJsonMessageBus,
redisConfig: RedisConfig,
channelsToSubscribe: Seq[String],
patternsToSubscribe: Seq[String],
forwardMsgToChannel: String): Props =
Props(
classOf[ScreenshareRedisSubscriberActor],
system, jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
system,
jsonMsgBus,
redisConfig,
channelsToSubscribe,
patternsToSubscribe,
forwardMsgToChannel).withDispatcher("akka.redis-subscriber-worker-dispatcher")
}

class ScreenshareRedisSubscriberActor(
system: ActorSystem,
jsonMsgBus: IncomingJsonMessageBus,
redisHost: String, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberProvider(system, "BbbScreenshareAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration {
redisConfig: RedisConfig,
channelsToSubscribe: Seq[String],
patternsToSubscribe: Seq[String],
forwardMsgToChannel: String)
extends RedisSubscriberProvider(
system,
"BbbScreenshareAkkaSub",
channelsToSubscribe,
patternsToSubscribe,
jsonMsgBus,
redisConfig) {

addListener(toScreenshareAppsJsonChannel)
/**
* override def addListener(forwardMsgToChannel: String) {
* connection.addListener(new RedisPubSubListener[String, String] {
* def message(channel: String, message: String): Unit = {
* println("**** RECEIVED MESSASGE FROm CHANNLE " + channel)
* if (channelsToSubscribe.contains(channel)) {
* val receivedJsonMessage = new ReceivedJsonMessage(channel, message)
* jsonMsgBus.publish(IncomingJsonMessage(forwardMsgToChannel, receivedJsonMessage))
* }
* }
* def message(pattern: String, channel: String, message: String): Unit = {
* log.debug(s"RECEIVED:\n ${message} \n")
* }
* def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
* def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
* def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
* def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
* })
* }
*/
addListener(forwardMsgToChannel)
subscribe()
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
init-method="start" destroy-method="stop">
<property name="host" value="${redis.host}" />
<property name="port" value="${redis.port}" />
<property name="password" value="${redis.password:}" />
<property name="clientName" value="BbbScreenshare" />
<property name="password" value="${redis.password}" />
<property name="expireKey" value="${redis.keyExpiry}" />
<property name="clientName" value="BbbScreenshareRedisStore" />
</bean>
</beans>
Loading

0 comments on commit 0bd6b21

Please sign in to comment.