diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala old mode 100644 new mode 100755 index ca8f4ed7f6db..e4dc18f2f5c1 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala @@ -4,7 +4,7 @@ import org.bigbluebutton.common2.bus.IncomingJsonMessageBus import org.bigbluebutton.common2.redis.RedisPublisher import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor import org.bigbluebutton.transcode.JsonMsgHdlrActor - +import org.bigbluebutton.common2.redis.RedisConfig import akka.actor.ActorSystem import org.bigbluebutton.common2.redis.MessageSender import org.bigbluebutton.transcode.core.TranscodingInGW @@ -13,7 +13,15 @@ object Boot extends App with SystemConfiguration { implicit val system = ActorSystem("bigbluebutton-transcode-system") - val redisPublisher = new RedisPublisher(system, "BbbTranscodeAkkaPub") + val redisPass = if (redisPassword != "") Some(redisPassword) else None + val redisConfig = RedisConfig(redisHost, redisPort, redisPass, redisExpireKey) + + val redisPublisher = new RedisPublisher( + system, + "BbbTranscodeAkkaPub", + redisConfig + ) + val msgSender = new MessageSender(redisPublisher) var inGW = new TranscodingInGW(system, msgSender) @@ -22,5 +30,17 @@ object Boot extends App with SystemConfiguration { val redisMessageHandlerActor = system.actorOf(JsonMsgHdlrActor.props(inGW)) inJsonMsgBus.subscribe(redisMessageHandlerActor, toAkkaTranscodeJsonChannel) - val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, inJsonMsgBus), "redis-subscriber") + val channelsToSubscribe = Seq(toAkkaTranscodeRedisChannel) + + val redisSubscriberActor = system.actorOf( + AppsRedisSubscriberActor.props( + system, + inJsonMsgBus, + redisConfig, + channelsToSubscribe, + Nil, + toAkkaTranscodeJsonChannel + ), + "redis-subscriber" + ) } diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala old mode 100644 new mode 100755 index db2574f8d043..b7a4abda8b2f --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala @@ -1,14 +1,26 @@ package org.bigbluebutton import scala.util.Try +import com.typesafe.config.ConfigFactory -import org.bigbluebutton.common2.redis.RedisConfiguration +trait SystemConfiguration { + val config = ConfigFactory.load() + + // Redis server configuration + lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1") + lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379) + lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("") + lazy val redisExpireKey = Try(config.getInt("redis.keyExpiry")).getOrElse(1209600) -trait SystemConfiguration extends RedisConfiguration { lazy val _ffmpegPath = Try(config.getString("transcoder.ffmpeg-path")).getOrElse("/usr/local/bin/ffmpeg") lazy val _ffprobePath = Try(config.getString("transcoder.ffprobe-path")).getOrElse("/usr/local/bin/ffprobe") lazy val _videoconfLogoImagePath = Try(config.getString("videoconference.videoconf-logo-image-path")).getOrElse("") lazy val _enableUserVideoSubtitle = Try(config.getString("videoconference.enable-user-video-subtitle").toBoolean).getOrElse(false) lazy val _sipVideoResolution = Try(config.getString("videoconference.sip-video-resolution")).getOrElse("") + + lazy val toAkkaTranscodeRedisChannel = Try(config.getString("redis.toAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:to-bbb-transcode:system") + lazy val fromAkkaTranscodeRedisChannel = Try(config.getString("redis.fromAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:from-bbb-transcode:system") + lazy val toAkkaTranscodeJsonChannel = Try(config.getString("eventBus.toAkkaTranscodeJsonChannel")).getOrElse("to-akka-transcode-json-channel") + lazy val fromAkkaTranscodeJsonChannel = Try(config.getString("eventBus.fromAkkaTranscodeJsonChannel")).getOrElse("from-akka-transcode-json-channel") } diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala index 805501b31f3d..1429ec282f17 100755 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -1,48 +1,63 @@ -package org.bigbluebutton.endpoint.redis - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.DurationInt - -import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.common2.bus.IncomingJsonMessageBus -import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider } - -import akka.actor.ActorSystem -import akka.actor.Props - -object AppsRedisSubscriberActor extends RedisSubscriber { - - val channels = Seq(toAkkaTranscodeRedisChannel) - val patterns = Seq("bigbluebutton:to-bbb-transcode:*") - - def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props = - Props( - classOf[AppsRedisSubscriberActor], - system, jsonMsgBus, - redisHost, redisPort, - channels, patterns - ).withDispatcher("akka.redis-subscriber-worker-dispatcher") -} - -class AppsRedisSubscriberActor( - system: ActorSystem, - msgBus: IncomingJsonMessageBus, redisHost: String, - redisPort: Int, - channels: Seq[String] = Nil, patterns: Seq[String] = Nil -) - extends RedisSubscriberProvider(system, "BbbTranscodeAkkaSub", channels, patterns, msgBus) with SystemConfiguration { - - var lastPongReceivedOn = 0L - system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) - - def checkPongMessage() { - val now = System.currentTimeMillis() - - if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) { - log.error("BBB-Transcode pubsub error!"); - } - } - - addListener(toAkkaTranscodeJsonChannel) - subscribe() -} +package org.bigbluebutton.endpoint.redis + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import org.bigbluebutton.common2.redis.RedisConfig +import org.bigbluebutton.common2.bus.IncomingJsonMessageBus +import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider } + +import akka.actor.ActorSystem +import akka.actor.Props + +object AppsRedisSubscriberActor { + + def props( + system: ActorSystem, + jsonMsgBus: IncomingJsonMessageBus, + redisConfig: RedisConfig, + channelsToSubscribe: Seq[String], + patternsToSubscribe: Seq[String], + forwardMsgToChannel: String + ): Props = + Props( + classOf[AppsRedisSubscriberActor], + system, + jsonMsgBus, + redisConfig, + channelsToSubscribe, + patternsToSubscribe, + forwardMsgToChannel + ).withDispatcher("akka.redis-subscriber-worker-dispatcher") +} + +class AppsRedisSubscriberActor( + system: ActorSystem, + jsonMsgBus: IncomingJsonMessageBus, + redisConfig: RedisConfig, + channelsToSubscribe: Seq[String], + patternsToSubscribe: Seq[String], + forwardMsgToChannel: String +) + extends RedisSubscriberProvider( + system, + "BbbTranscodeAkkaSub", + channelsToSubscribe, + patternsToSubscribe, + jsonMsgBus, + redisConfig + ) { + + var lastPongReceivedOn = 0L + system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) + + def checkPongMessage() { + val now = System.currentTimeMillis() + + if (lastPongReceivedOn != 0 && (now - lastPongReceivedOn > 30000)) { + log.error("BBB-Transcode pubsub error!"); + } + } + + addListener(forwardMsgToChannel) + subscribe() +} diff --git a/akka-bbb-transcode/src/universal/conf/application.conf b/akka-bbb-transcode/src/universal/conf/application.conf old mode 100644 new mode 100755 index f7d6a7b185dc..79af4fd2796a --- a/akka-bbb-transcode/src/universal/conf/application.conf +++ b/akka-bbb-transcode/src/universal/conf/application.conf @@ -22,6 +22,8 @@ redis { host="127.0.0.1" port=6379 password="" + # recording keys should expire in 14 days + keyExpiry=1209600 } videoconference {