From d50f65fce1938a359ac3b216544e6939ac340a73 Mon Sep 17 00:00:00 2001 From: Pedro Beschorner Marin Date: Sun, 23 Jul 2017 20:47:42 +0000 Subject: [PATCH] Moved akka-transcode messages to 2.0 format --- .../core/BigBlueButtonActor.scala | 3 + .../core/MessageSenderActor.scala | 6 -- .../core2/message/senders/MsgBuilder.scala | 10 ++ akka-bbb-transcode/build.sbt | 55 ++++++++--- .../transcode/core/VideoTranscoder.java | 2 +- .../transcode/core/api/ITranscodingInGW.java | 12 --- .../pubsub/RedisMessageReceiver.java | 81 ---------------- .../transcode/util/Constants.java | 38 ++++++++ .../main/scala/org/bigbluebutton/Boot.scala | 18 ++-- .../bigbluebutton/SystemConfiguration.scala | 5 + .../redis/AppsRedisSubscriberActor.scala | 80 +++++++-------- .../endpoint/redis/RedisPublisher.scala | 3 - .../transcode/JsonMsgDeserializer.scala | 97 +++++++++++++++++++ .../transcode/JsonMsgHdlrActor.scala | 47 +++++++++ .../transcode/api/OutMessages.scala | 12 +-- .../transcode/bus/InJsonMsgBus.scala | 31 ++++++ .../transcode/core/MessageSenderActor.scala | 76 +++++++++------ .../transcode/core/TranscodersService.scala | 1 - .../transcode/core/TranscodingActor.scala | 6 +- .../{ => core}/TranscodingInGW.scala | 12 +-- .../core/apps/TranscodingObserverApp.scala | 10 +- .../common/messages/Constants.java | 18 ---- .../payload/StartProbingReplyMessage.java | 64 ------------ .../payload/StartProbingRequestMessage.java | 64 ------------ .../payload/StartTranscoderReplyMessage.java | 64 ------------ .../StartTranscoderRequestMessage.java | 64 ------------ .../StopMeetingTranscodersMessage.java | 52 ---------- .../payload/StopTranscoderReplyMessage.java | 58 ----------- .../payload/StopTranscoderRequestMessage.java | 58 ----------- .../TranscoderStatusUpdateMessage.java | 64 ------------ .../payload/UpdateTranscoderReplyMessage.java | 64 ------------ .../UpdateTranscoderRequestMessage.java | 64 ------------ .../common2/msgs/TranscoderMsgs.scala | 43 ++++++++ .../bigbluebutton/red5/pubsub/Constants.java | 34 +++++++ .../red5/pubsub/MessagePublisher.java | 85 +++++++++++----- .../server/red5/pubsub/MessagePublisher.java | 18 +--- .../lib/bbb/messages/Messaging.js | 8 +- .../transcode/StartTranscoderSysReqMsg.js | 17 ++++ .../transcode/StopTranscoderSysReqMsg.js | 16 +++ 39 files changed, 558 insertions(+), 902 deletions(-) delete mode 100644 akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java delete mode 100644 akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java create mode 100644 akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/util/Constants.java create mode 100644 akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgDeserializer.scala create mode 100644 akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgHdlrActor.scala create mode 100644 akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/bus/InJsonMsgBus.scala rename akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/{ => core}/TranscodingInGW.scala (84%) delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopMeetingTranscodersMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java delete mode 100644 bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java create mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/TranscoderMsgs.scala create mode 100644 bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/Constants.java create mode 100644 labs/kurento-screenshare/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js create mode 100644 labs/kurento-screenshare/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala index b2b971d6d4ac..4964f6c39c3e 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala @@ -174,6 +174,9 @@ class BigBlueButtonActor( val disconnectEvnt = MsgBuilder.buildDisconnectAllClientsSysMsg(msg.body.meetingId) outGW.send(disconnectEvnt) + val stopTranscodersCmd = MsgBuilder.buildStopMeetingTranscodersSysCmdMsg(msg.body.meetingId) + outGW.send(stopTranscodersCmd) + log.info("Destroyed meetingId={}", msg.body.meetingId) val destroyedEvent = MsgBuilder.buildMeetingDestroyedEvtMsg(msg.body.meetingId) outGW.send(destroyedEvent) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala index 44b03a0c0c2d..b620ce86afe8 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/MessageSenderActor.scala @@ -15,7 +15,6 @@ import scala.concurrent.duration._ import org.bigbluebutton.core.pubsub.senders.UsersMessageToJsonConverter import org.bigbluebutton.common.messages.UserEjectedFromMeetingMessage import org.bigbluebutton.common.converters.ToJsonEncoder -import org.bigbluebutton.common.messages.StopMeetingTranscodersMessage import scala.collection.JavaConverters object MessageSenderActor { @@ -280,11 +279,6 @@ class MessageSenderActor(val service: MessageSender) service.send(MessagingConstants.FROM_USERS_CHANNEL, json) } - private def handleStopMeetingTranscoders(msg: StopMeetingTranscoders) { - val smt = new StopMeetingTranscodersMessage(msg.meetingID) - service.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, smt.toJson()) - } - private def handleGetGuestPolicyReply(msg: GetGuestPolicyReply) { val json = UsersMessageToJsonConverter.getGuestPolicyReplyToJson(msg) service.send(MessagingConstants.FROM_USERS_CHANNEL, json) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala index e817177c330f..cb24ab56444e 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala @@ -257,4 +257,14 @@ object MsgBuilder { BbbCommonEnvCoreMsg(envelope, event) } + + def buildStopMeetingTranscodersSysCmdMsg(meetingId: String): BbbCommonEnvCoreMsg = { + val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka") + val envelope = BbbCoreEnvelope(StopMeetingTranscodersSysCmdMsg.NAME, routing) + val body = StopMeetingTranscodersSysCmdMsgBody() + val header = BbbCoreHeaderWithMeetingId(StopMeetingTranscodersSysCmdMsg.NAME, meetingId) + val event = StopMeetingTranscodersSysCmdMsg(header, body) + + BbbCommonEnvCoreMsg(envelope, event) + } } diff --git a/akka-bbb-transcode/build.sbt b/akka-bbb-transcode/build.sbt index 68d5a5305231..c62896090492 100755 --- a/akka-bbb-transcode/build.sbt +++ b/akka-bbb-transcode/build.sbt @@ -4,9 +4,9 @@ name := "bbb-transcode-akka" organization := "org.bigbluebutton" -version := "0.0.1" +version := "0.0.2" -scalaVersion := "2.11.6" +scalaVersion := "2.12.2" scalacOptions ++= Seq( "-unchecked", @@ -14,7 +14,7 @@ scalacOptions ++= Seq( "-Xlint", "-Ywarn-dead-code", "-language:_", - "-target:jvm-1.7", + "-target:jvm-1.8", "-encoding", "UTF-8" ) @@ -24,6 +24,8 @@ resolvers ++= Seq( "blindside-repos" at "http://blindside.googlecode.com/svn/repository/" ) +resolvers += Resolver.sonatypeRepo("releases") + publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/dev/repo/maven-repo/releases" )) ) // We want to have our jar files in lib_managed dir. @@ -35,24 +37,47 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports") +val akkaVersion = "2.5.1" +val scalaTestV = "2.2.6" + libraryDependencies ++= { - val akkaVersion = "2.3.11" Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "ch.qos.logback" % "logback-classic" % "1.0.3", - "org.pegdown" % "pegdown" % "1.4.0", - "junit" % "junit" % "4.11", - "com.etaty.rediscala" %% "rediscala" % "1.4.0", - "commons-codec" % "commons-codec" % "1.8", + "junit" % "junit" % "4.11", + "commons-codec" % "commons-codec" % "1.10", "joda-time" % "joda-time" % "2.3", - "com.google.code.gson" % "gson" % "1.7.1", - "redis.clients" % "jedis" % "2.1.0", - "org.apache.commons" % "commons-lang3" % "3.2", - "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT" + "org.apache.commons" % "commons-lang3" % "3.2" )} +libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT" + +// https://mvnrepository.com/artifact/org.scala-lang/scala-library +libraryDependencies += "org.scala-lang" % "scala-library" % "2.12.2" +// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler +libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.2" + +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion + +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion + +// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12 +libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0" + +// For generating test reports +libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test" +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test" + +// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12 +libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test" + +// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12 +libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test" + +libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test" + seq(Revolver.settings: _*) scalariformSettings diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java index 27c47bc00b6f..f1d5e330db5a 100644 --- a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/VideoTranscoder.java @@ -15,7 +15,7 @@ import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils; import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitor; import org.bigbluebutton.transcode.core.processmonitor.ProcessMonitorObserver; -import org.bigbluebutton.common.messages.Constants; +import org.bigbluebutton.transcode.util.Constants; import akka.actor.UntypedActor; import akka.actor.ActorRef; diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java deleted file mode 100644 index 2a992e6bbdea..000000000000 --- a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/core/api/ITranscodingInGW.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.bigbluebutton.transcode.core.api; - -import java.util.Map; - - -public interface ITranscodingInGW { - void startTranscoder(String meetingId, String transcoderId, Map params); - void updateTranscoder(String meetingId, String transcoderId, Map params); - void stopTranscoder(String meetingId, String transcoderId); - void stopMeetingTranscoders(String meetingId); - void startProbing(String meetingId, String transcoderId, Map params); -} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java deleted file mode 100644 index cb5432057c24..000000000000 --- a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/pubsub/RedisMessageReceiver.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.bigbluebutton.transcode.pubsub.receivers; - -import org.bigbluebutton.transcode.core.api.ITranscodingInGW; - -import org.bigbluebutton.common.messages.StartTranscoderRequestMessage; -import org.bigbluebutton.common.messages.UpdateTranscoderRequestMessage; -import org.bigbluebutton.common.messages.StopTranscoderRequestMessage; -import org.bigbluebutton.common.messages.StopMeetingTranscodersMessage; -import org.bigbluebutton.common.messages.StartProbingRequestMessage; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - -public class RedisMessageReceiver { - - public static final String TO_BBB_TRANSCODE_CHANNEL = "bigbluebutton:to-bbb-transcode"; - public static final String TO_BBB_TRANSCODE_PATTERN = TO_BBB_TRANSCODE_CHANNEL + ":*"; - public static final String TO_BBB_TRANSCODE_SYSTEM_CHAN = TO_BBB_TRANSCODE_CHANNEL + ":system"; - - private ITranscodingInGW transcodingInGW; - - public RedisMessageReceiver(ITranscodingInGW transcodingInGW) { - this.transcodingInGW = transcodingInGW; - } - - public void handleMessage(String pattern, String channel, String message) { - if (channel.equalsIgnoreCase(TO_BBB_TRANSCODE_SYSTEM_CHAN)) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - switch (messageName) { - case StartTranscoderRequestMessage.START_TRANSCODER_REQUEST: - processStartTranscoderRequestMessage(message); - break; - case UpdateTranscoderRequestMessage.UPDATE_TRANSCODER_REQUEST: - processUpdateTranscoderRequestMessage(message); - break; - case StopTranscoderRequestMessage.STOP_TRANSCODER_REQUEST: - processStopTranscoderRequestMessage(message); - break; - case StopMeetingTranscodersMessage.STOP_MEETING_TRANSCODERS: - processStopMeetingTranscodersMessage(message); - break; - case StartProbingRequestMessage.START_PROBING_REQUEST: - processStartProbingRequestMessage(message); - } - } - } - } - } - - private void processStartTranscoderRequestMessage(String json) { - StartTranscoderRequestMessage msg = StartTranscoderRequestMessage.fromJson(json); - transcodingInGW.startTranscoder(msg.meetingId, msg.transcoderId, msg.params); - } - - private void processUpdateTranscoderRequestMessage(String json) { - UpdateTranscoderRequestMessage msg = UpdateTranscoderRequestMessage.fromJson(json); - transcodingInGW.updateTranscoder(msg.meetingId, msg.transcoderId, msg.params); - } - - private void processStopTranscoderRequestMessage(String json) { - StopTranscoderRequestMessage msg = StopTranscoderRequestMessage.fromJson(json); - transcodingInGW.stopTranscoder(msg.meetingId, msg.transcoderId); - } - - private void processStopMeetingTranscodersMessage(String json) { - StopMeetingTranscodersMessage msg = StopMeetingTranscodersMessage.fromJson(json); - transcodingInGW.stopMeetingTranscoders(msg.meetingId); - } - - private void processStartProbingRequestMessage(String json) { - StartProbingRequestMessage msg = StartProbingRequestMessage.fromJson(json); - transcodingInGW.startProbing(msg.meetingId, msg.transcoderId, msg.params); - } -} diff --git a/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/util/Constants.java b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/util/Constants.java new file mode 100644 index 000000000000..fa000d175db2 --- /dev/null +++ b/akka-bbb-transcode/src/main/java/org/bigbluebutton/transcode/util/Constants.java @@ -0,0 +1,38 @@ +package org.bigbluebutton.transcode.util; + +public class Constants { + public static final String WIDTH_RATIO = "width_ratio"; + public static final String HEIGHT_RATIO = "height_ratio"; + public static final String CALLERNAME = "callername"; + public static final String VOICE_CONF = "voice_conf"; + public static final String LOCAL_IP_ADDRESS = "local_ip_address"; + public static final String LOCAL_VIDEO_PORT = "local_video_port"; + public static final String REMOTE_VIDEO_PORT = "remote_video_port"; + public static final String DESTINATION_IP_ADDRESS = "destination_ip_address"; + public static final String SIP_HOST = "sip_host"; + public static final String TRANSCODER_TYPE = "transcoder_type"; + public static final String INPUT = "input"; + public static final String OUTPUT = "output"; + public static final String TRANSCODE_RTP_TO_RTMP = "transcode_rtp_to_rtmp"; + public static final String TRANSCODE_RTMP_TO_RTP = "transcode_rtmp_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTP = "transcode_file_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTMP = "transcode_file_to_rtmp"; + public static final String TRANSCODE_H264_TO_H263 = "transcode_h264_to_h263"; + public static final String TRANSCODE_ROTATE_RIGHT = "transcode_rotate_right"; + public static final String TRANSCODE_ROTATE_LEFT = "transcode_rotate_left"; + public static final String TRANSCODE_ROTATE_UPSIDE_DOWN = "transcode_rotate_upside_down"; + public static final String MODULE = "module"; + public static final String PROBE_RTMP = "probe_rtmp"; + + public static final String HEADER = "header"; + public static final String CORE = "core"; + public static final String BODY = "body"; + public static final String ENVELOPE = "envelope"; + public static final String ROUTING = "routing"; + public static final String NAME = "name"; + public static final String SENDER = "sender"; + public static final String PARAMS = "params"; + public static final String TRANSCODER_ID = "transcoderId"; + public static final String MEETING_ID = "meetingId"; + public static final String VIDEO = "video"; +} \ No newline at end of file diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala index d38af2c86d47..d79c876bb580 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/Boot.scala @@ -1,13 +1,11 @@ package org.bigbluebutton -import akka.actor.{ ActorSystem, Props } -import scala.concurrent.duration._ -import redis.RedisClient -import scala.concurrent.{ Future, Await } -import scala.concurrent.ExecutionContext.Implicits.global +import akka.actor.ActorSystem + import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor } -import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver +import org.bigbluebutton.transcode.JsonMsgHdlrActor import org.bigbluebutton.transcode.core.TranscodingInGW +import org.bigbluebutton.transcode.bus.InJsonMsgBus object Boot extends App with SystemConfiguration { @@ -15,9 +13,11 @@ object Boot extends App with SystemConfiguration { val redisPublisher = new RedisPublisher(system) - var transcodingInGW = new TranscodingInGW(system, redisPublisher); + var inGW = new TranscodingInGW(system, redisPublisher) - val redisMsgReceiver = new RedisMessageReceiver(transcodingInGW); + val inJsonMsgBus = new InJsonMsgBus + val redisMessageHandlerActor = system.actorOf(JsonMsgHdlrActor.props(inGW)) + inJsonMsgBus.subscribe(redisMessageHandlerActor, toAkkaTranscodeJsonChannel) - val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber") + val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, inJsonMsgBus), "redis-subscriber") } diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala index 9bfb7098867f..47eda79c6466 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/SystemConfiguration.scala @@ -17,4 +17,9 @@ trait SystemConfiguration { lazy val _videoconfLogoImagePath = Try(config.getString("videoconference.videoconf-logo-image-path")).getOrElse("") lazy val _enableUserVideoSubtitle = Try(config.getString("videoconference.enable-user-video-subtitle").toBoolean).getOrElse(false) lazy val _sipVideoResolution = Try(config.getString("videoconference.sip-video-resolution")).getOrElse("") + + lazy val toAkkaTranscodeRedisChannel = Try(config.getString("redis.toAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:to-bbb-transcode:system") + lazy val fromAkkaTranscodeRedisChannel = Try(config.getString("redis.fromAkkaTranscodeRedisChannel")).getOrElse("bigbluebutton:from-bbb-transcode:system") + lazy val toAkkaTranscodeJsonChannel = Try(config.getString("eventBus.toAkkaTranscodeJsonChannel")).getOrElse("to-akka-transcode-json-channel") + lazy val fromAkkaTranscodeJsonChannel = Try(config.getString("eventBus.fromAkkaTranscodeJsonChannel")).getOrElse("from-akka-transcode-json-channel") } diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala index 3fc85256acad..96cc6d77c630 100755 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -1,44 +1,56 @@ package org.bigbluebutton.endpoint.redis -import akka.actor.Props +import java.io.PrintWriter +import java.io.StringWriter import java.net.InetSocketAddress + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt + +import akka.actor.ActorSystem +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.SupervisorStrategy.Resume + import redis.actors.RedisSubscriberActor -import redis.api.pubsub.{ PMessage, Message } -import scala.concurrent.duration._ -import akka.actor.ActorRef -import akka.actor.actorRef2Scala -import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.transcode.pubsub.receivers.RedisMessageReceiver +import redis.api.pubsub.Message +import redis.api.pubsub.PMessage import redis.api.servers.ClientSetname -import org.bigbluebutton.common.converters.FromJsonDecoder -import org.bigbluebutton.common.messages.PubSubPongMessage -import akka.actor.ActorSystem -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global + +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.transcode.bus.{ InJsonMsg, InJsonMsgBus, ReceivedJsonMsg } object AppsRedisSubscriberActor extends SystemConfiguration { - val channels = Seq("time") + val channels = Seq(toAkkaTranscodeRedisChannel) val patterns = Seq("bigbluebutton:to-bbb-transcode:*") - def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props = - Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, + def props(system: ActorSystem, msgBus: InJsonMsgBus): Props = + Props(classOf[AppsRedisSubscriberActor], system, msgBus, redisHost, redisPort, channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") } -class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String, - redisPort: Int, channels: Seq[String] = Nil, patterns: Seq[String] = Nil) +class AppsRedisSubscriberActor(val system: ActorSystem, + msgBus: InJsonMsgBus, redisHost: String, + redisPort: Int, + channels: Seq[String] = Nil, patterns: Seq[String] = Nil) extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort), - channels, patterns) { - - val decoder = new FromJsonDecoder() + channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) with SystemConfiguration { + + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + case e: Exception => { + val sw: StringWriter = new StringWriter() + sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") + e.printStackTrace(new PrintWriter(sw)) + log.error(sw.toString()) + Resume + } + } var lastPongReceivedOn = 0L system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli write(ClientSetname("BbbTranscodeAkkaSub").encodedRequest) def checkPongMessage() { @@ -50,27 +62,15 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag } def onMessage(message: Message) { - log.debug(s"message received: $message") + if (message.channel == toAkkaTranscodeRedisChannel) { + val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String) + log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n") + msgBus.publish(InJsonMsg(toAkkaTranscodeJsonChannel, receivedJsonMessage)) + } } def onPMessage(pmessage: PMessage) { - log.debug(s"pattern message received: $pmessage") - - val msg = decoder.decodeMessage(pmessage.data) - - if (msg != null) { - msg match { - case m: PubSubPongMessage => { - if (m.payload.system == "BbbTranscode") { - lastPongReceivedOn = System.currentTimeMillis() - } - } - case _ => // do nothing - } - } else { - msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data) - } - + // log.debug(s"pattern message received: $pmessage") } def handleMessage(msg: String) { diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala index 1f491cef6349..9a9720d3bcbd 100755 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/endpoint/redis/RedisPublisher.scala @@ -1,12 +1,9 @@ package org.bigbluebutton.endpoint.redis -import akka.actor.Props import redis.RedisClient import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor.ActorSystem -import scala.concurrent.Await -import akka.actor.Actor import org.bigbluebutton.SystemConfiguration import org.bigbluebutton.common.converters.ToJsonEncoder diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgDeserializer.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgDeserializer.scala new file mode 100644 index 000000000000..7a85ba73ae3a --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgDeserializer.scala @@ -0,0 +1,97 @@ +package org.bigbluebutton.transcode + +import org.bigbluebutton.common2.msgs._ + +import com.fasterxml.jackson.databind.JsonNode +import scala.collection.JavaConverters + +trait JsonMsgDeserializer { + this: JsonMsgHdlrActor => + + object JsonDeserializer extends Deserializer + + def routeStartProbingSysReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + def deserialize(jsonNode: JsonNode): Option[StartProbingSysReqMsg] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[StartProbingSysReqMsg](jsonNode) + result match { + case Some(msg) => Some(msg.asInstanceOf[StartProbingSysReqMsg]) + case None => + log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode) + None + } + } + for { + m <- deserialize(jsonNode) + } yield { + inGW.startProbing(m.header.meetingId, m.body.transcoderId, JavaConverters.mapAsJavaMap(m.body.params)) + } + } + + def routeStartTranscoderSysReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + def deserialize(jsonNode: JsonNode): Option[StartTranscoderSysReqMsg] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[StartTranscoderSysReqMsg](jsonNode) + result match { + case Some(msg) => Some(msg.asInstanceOf[StartTranscoderSysReqMsg]) + case None => + log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode) + None + } + } + for { + m <- deserialize(jsonNode) + } yield { + inGW.startTranscoder(m.header.meetingId, m.body.transcoderId, JavaConverters.mapAsJavaMap(m.body.params)) + } + } + + def routeStopTranscoderSysReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + def deserialize(jsonNode: JsonNode): Option[StopTranscoderSysReqMsg] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[StopTranscoderSysReqMsg](jsonNode) + result match { + case Some(msg) => Some(msg.asInstanceOf[StopTranscoderSysReqMsg]) + case None => + log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode) + None + } + } + for { + m <- deserialize(jsonNode) + } yield { + inGW.stopTranscoder(m.header.meetingId, m.body.transcoderId) + } + } + + def routeUpdateTranscoderSysReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + def deserialize(jsonNode: JsonNode): Option[UpdateTranscoderSysReqMsg] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[UpdateTranscoderSysReqMsg](jsonNode) + result match { + case Some(msg) => Some(msg.asInstanceOf[UpdateTranscoderSysReqMsg]) + case None => + log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode) + None + } + } + for { + m <- deserialize(jsonNode) + } yield { + inGW.updateTranscoder(m.header.meetingId, m.body.transcoderId, JavaConverters.mapAsJavaMap(m.body.params)) + } + } + + def routeStopMeetingTranscodersSysCmdMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + def deserialize(jsonNode: JsonNode): Option[StopMeetingTranscodersSysCmdMsg] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[StopMeetingTranscodersSysCmdMsg](jsonNode) + result match { + case Some(msg) => Some(msg.asInstanceOf[StopMeetingTranscodersSysCmdMsg]) + case None => + log.error("Failed to deserialize message: error: {} \n msg: {}", error, jsonNode) + None + } + } + for { + m <- deserialize(jsonNode) + } yield { + inGW.stopMeetingTranscoders(m.header.meetingId) + } + } +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgHdlrActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgHdlrActor.scala new file mode 100644 index 000000000000..5812b0436add --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/JsonMsgHdlrActor.scala @@ -0,0 +1,47 @@ +package org.bigbluebutton.transcode + +import akka.actor.{ Actor, ActorLogging, Props } + +import com.fasterxml.jackson.databind.JsonNode + +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.common2.msgs._ +import org.bigbluebutton.transcode.bus.ReceivedJsonMsg +import org.bigbluebutton.transcode.core.TranscodingInGW + +object JsonMsgHdlrActor { + def props(inGW: TranscodingInGW): Props = Props(classOf[JsonMsgHdlrActor], inGW) +} + +class JsonMsgHdlrActor(val inGW: TranscodingInGW) extends Actor with ActorLogging + with SystemConfiguration with JsonMsgDeserializer { + def receive = { + case msg: ReceivedJsonMsg => + log.debug("handling {} - {}", msg.channel, msg.data) + handleReceivedJsonMessage(msg) + case _ => // do nothing + } + + def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = { + for { + envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data) + } yield handle(envJsonNode.envelope, envJsonNode.core) + } + + def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + log.debug("Route envelope name " + envelope.name) + envelope.name match { + case StartProbingSysReqMsg.NAME => + routeStartProbingSysReqMsg(envelope, jsonNode) + case StartTranscoderSysReqMsg.NAME => + routeStartTranscoderSysReqMsg(envelope, jsonNode) + case StopTranscoderSysReqMsg.NAME => + routeStopTranscoderSysReqMsg(envelope, jsonNode) + case UpdateTranscoderSysReqMsg.NAME => + routeUpdateTranscoderSysReqMsg(envelope, jsonNode) + case StopMeetingTranscodersSysCmdMsg.NAME => + routeStopMeetingTranscodersSysCmdMsg(envelope, jsonNode) + case _ => // do nothing + } + } +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala index fa650f308fd9..1df1166d25b4 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/api/OutMessages.scala @@ -1,12 +1,10 @@ package org.bigbluebutton.transcode.api -import scala.collection.mutable.HashMap - abstract class OutMessage -case class StartTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage +case class StartTranscoderReply(meetingId: String, transcoderId: String, params: Map[String, String]) extends OutMessage case class StopTranscoderReply(meetingId: String, transcoderId: String) extends OutMessage -case class UpdateTranscoderReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage -case class StartProbingReply(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage - -case class TranscoderStatusUpdate(meetingId: String, transcoderId: String, params: HashMap[String, String]) extends OutMessage +case class UpdateTranscoderReply(meetingId: String, transcoderId: String, params: Map[String, String]) extends OutMessage +case class StartProbingReply(meetingId: String, transcoderId: String, params: Map[String, String]) extends OutMessage +// Is this used? StartProbingReply sends the same message... +case class TranscoderStatusUpdate(meetingId: String, transcoderId: String, params: Map[String, String]) extends OutMessage diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/bus/InJsonMsgBus.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/bus/InJsonMsgBus.scala new file mode 100644 index 000000000000..70a883b4d14c --- /dev/null +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/bus/InJsonMsgBus.scala @@ -0,0 +1,31 @@ +package org.bigbluebutton.transcode.bus + +import akka.actor.ActorRef +import akka.event.{ EventBus, LookupClassification } + +case class ReceivedJsonMsg(channel: String, data: String) +case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg) + +class InJsonMsgBus extends EventBus with LookupClassification { + type Event = InJsonMsg + type Classifier = String + type Subscriber = ActorRef + + // is used for extracting the classifier from the incoming events + override protected def classify(event: Event): Classifier = event.topic + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + override protected def publish(event: Event, subscriber: Subscriber): Unit = { + subscriber ! event.payload + } + + // must define a full order over the subscribers, expressed as expected from + // `java.lang.Comparable.compare` + override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = + a.compareTo(b) + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + override protected def mapSize: Int = 128 +} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala index af2ce5e9f976..849d4f332046 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/MessageSenderActor.scala @@ -1,21 +1,12 @@ package org.bigbluebutton.transcode.core import akka.actor.Actor -import akka.actor.ActorContext import akka.actor.ActorLogging import akka.actor.Props import org.bigbluebutton.transcode.api._ import org.bigbluebutton.endpoint.redis.RedisPublisher - -import collection.JavaConverters._ -import scala.collection.JavaConversions._ - -import org.bigbluebutton.common.messages.StartTranscoderReplyMessage -import org.bigbluebutton.common.messages.StopTranscoderReplyMessage -import org.bigbluebutton.common.messages.TranscoderStatusUpdateMessage -import org.bigbluebutton.common.messages.UpdateTranscoderReplyMessage -import org.bigbluebutton.common.messages.StartProbingReplyMessage -import org.bigbluebutton.common.messages.MessagingConstants +import org.bigbluebutton.common2.msgs._ +import org.bigbluebutton.common2.util.JsonUtil object MessageSenderActor { def props(msgSender: RedisPublisher): Props = @@ -25,6 +16,9 @@ object MessageSenderActor { class MessageSenderActor(val msgSender: RedisPublisher) extends Actor with ActorLogging { + val fromBbbTranscodeRedisChannel = "bigbluebutton:from-bbb-transcode:system" + val routing = collection.immutable.HashMap("sender" -> "bbb-transcode") + def receive = { case msg: StartTranscoderReply => handleStartTranscoderReply(msg) case msg: StopTranscoderReply => handleStopTranscoderReply(msg) @@ -35,50 +29,68 @@ class MessageSenderActor(val msgSender: RedisPublisher) } private def handleStartTranscoderReply(msg: StartTranscoderReply) { - System.out.println("Sending StartTranscoderReplyMessage. Params: [\n" + System.out.println("Sending StartTranscoderSysRespMsg. Params: [\n" + "meetingId = " + msg.meetingId + "\n" + "transcoderId = " + msg.transcoderId + "\n" + "params = " + msg.params.mkString(", ") + "\n]\n") - - val str = new StartTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params) - msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + val header = BbbCoreHeaderWithMeetingId(StartTranscoderSysRespMsg.NAME, msg.meetingId) + val body = StartTranscoderSysRespMsgBody(msg.transcoderId, msg.params) + val envelope = BbbCoreEnvelope(StartTranscoderSysRespMsg.NAME, routing) + val evt = new StartTranscoderSysRespMsg(header, body) + val msgEvent = BbbCommonEnvCoreMsg(envelope, evt) + val json = JsonUtil.toJson(msgEvent) + msgSender.publish(fromBbbTranscodeRedisChannel, json) } private def handleStopTranscoderReply(msg: StopTranscoderReply) { - System.out.println("Sending StopTranscoderReplyMessage. Params: [\n" + System.out.println("Sending StopTranscoderSysRespMsg. Params: [\n" + "meetingId = " + msg.meetingId + "\n" + "transcoderId = " + msg.transcoderId + "\n]\n") - val str = new StopTranscoderReplyMessage(msg.meetingId, msg.transcoderId) - msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + val header = BbbCoreHeaderWithMeetingId(StopTranscoderSysRespMsg.NAME, msg.meetingId) + val body = StopTranscoderSysRespMsgBody(msg.transcoderId) + val envelope = BbbCoreEnvelope(StopTranscoderSysRespMsg.NAME, routing) + val evt = new StopTranscoderSysRespMsg(header, body) + val msgEvent = BbbCommonEnvCoreMsg(envelope, evt) + val json = JsonUtil.toJson(msgEvent) + msgSender.publish(fromBbbTranscodeRedisChannel, json) } private def handleUpdateTranscoderReply(msg: UpdateTranscoderReply) { - System.out.println("Sending UpdateTranscoderReplyMessage. Params: [\n" + System.out.println("Sending UpdateTranscoderSysRespMsg. Params: [\n" + "meetingId = " + msg.meetingId + "\n" + "transcoderId = " + msg.transcoderId + "\n" + "params = " + msg.params.mkString(", ") + "\n]\n") - - val str = new UpdateTranscoderReplyMessage(msg.meetingId, msg.transcoderId, msg.params) - msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + val header = BbbCoreHeaderWithMeetingId(UpdateTranscoderSysRespMsg.NAME, msg.meetingId) + val body = UpdateTranscoderSysRespMsgBody(msg.transcoderId, msg.params) + val envelope = BbbCoreEnvelope(UpdateTranscoderSysRespMsg.NAME, routing) + val evt = new UpdateTranscoderSysRespMsg(header, body) + val msgEvent = BbbCommonEnvCoreMsg(envelope, evt) + val json = JsonUtil.toJson(msgEvent) + msgSender.publish(fromBbbTranscodeRedisChannel, json) } - private def handleTranscoderStatusUpdate(msg: TranscoderStatusUpdate) { - System.out.println("Sending TranscoderStatusUpdateMessage. Params: [\n" + private def handleStartProbingReply(msg: StartProbingReply) { + System.out.println("Sending StartProbingSysRespMsg. Params: [\n" + "meetingId = " + msg.meetingId + "\n" + "transcoderId = " + msg.transcoderId + "\n" + "params = " + msg.params.mkString(", ") + "\n]\n") - - val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params) - msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + val header = BbbCoreHeaderWithMeetingId(StartProbingSysRespMsg.NAME, msg.meetingId) + val body = StartProbingSysRespMsgBody(msg.transcoderId, msg.params) + val envelope = BbbCoreEnvelope(StartProbingSysRespMsg.NAME, routing) + val evt = new StartProbingSysRespMsg(header, body) + val msgEvent = BbbCommonEnvCoreMsg(envelope, evt) + val json = JsonUtil.toJson(msgEvent) + msgSender.publish(fromBbbTranscodeRedisChannel, json) } - private def handleStartProbingReply(msg: StartProbingReply) { - System.out.println("Sending StartProbingReplyMessage. Params: [\n" + private def handleTranscoderStatusUpdate(msg: TranscoderStatusUpdate) { + System.out.println("Sending TranscoderStatusUpdateMessage. Params: [\n" + "meetingId = " + msg.meetingId + "\n" + "transcoderId = " + msg.transcoderId + "\n" + "params = " + msg.params.mkString(", ") + "\n]\n") + + /* TODO: Check if this is really needed val str = new TranscoderStatusUpdateMessage(msg.meetingId, msg.transcoderId, msg.params) - msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson()) + msgSender.publish(MessagingConstants.FROM_BBB_TRANSCODE_SYSTEM_CHAN, str.toJson())*/ } - -} +} \ No newline at end of file diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala index 1991583dee02..56d0db147817 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodersService.scala @@ -1,7 +1,6 @@ package org.bigbluebutton.transcode.core import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.transcode.core.ffmpeg.FFmpegUtils class TranscodersService {} diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala index 0fcc1d4fa4fd..b4b191014911 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingActor.scala @@ -2,14 +2,10 @@ package org.bigbluebutton.transcode.core import akka.actor._ import akka.actor.ActorLogging -import scala.collection.mutable.HashMap import org.bigbluebutton.endpoint.redis.RedisPublisher import org.bigbluebutton.transcode.api._ import org.bigbluebutton.SystemConfiguration -import scala.collection._ -import scala.collection.JavaConversions._ -import org.bigbluebutton.common.messages.Constants -import org.bigbluebutton.transcode.core.apps.{ TranscodingObserverApp } +import org.bigbluebutton.transcode.core.apps.TranscodingObserverApp object TranscodingActor extends SystemConfiguration { def props(system: ActorSystem, messageSender: RedisPublisher): Props = diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingInGW.scala similarity index 84% rename from akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala rename to akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingInGW.scala index 57abcc4b238e..41e9ea15530b 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/TranscodingInGW.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/TranscodingInGW.scala @@ -1,14 +1,11 @@ package org.bigbluebutton.transcode.core -import org.bigbluebutton.transcode.core.api.ITranscodingInGW -import org.bigbluebutton.endpoint.redis.RedisPublisher -import scala.collection.JavaConversions._ -import java.util.ArrayList -import scala.collection.mutable.ArrayBuffer import akka.actor.ActorSystem + +import org.bigbluebutton.endpoint.redis.RedisPublisher import org.bigbluebutton.transcode.api._ -class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) extends ITranscodingInGW { +class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) { val log = system.log val transcodingActor = system.actorOf(TranscodingActor.props(system, messageSender), "bbb-transcoding-manager") @@ -31,5 +28,4 @@ class TranscodingInGW(val system: ActorSystem, messageSender: RedisPublisher) ex def startProbing(meetingId: String, transcoderId: String, params: java.util.Map[String, String]) { transcodingActor ! new StartProbingRequest(meetingId, transcoderId, params) } - -} +} \ No newline at end of file diff --git a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala index a4ad9396d466..8f399b079b92 100644 --- a/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala +++ b/akka-bbb-transcode/src/main/scala/org/bigbluebutton/transcode/core/apps/TranscodingObserverApp.scala @@ -3,7 +3,7 @@ package org.bigbluebutton.transcode.core.apps import akka.actor.ActorRef import org.bigbluebutton.transcode.core.TranscodingActor import org.bigbluebutton.transcode.api._ -import org.bigbluebutton.common.messages.Constants +import org.bigbluebutton.transcode.util.Constants import org.bigbluebutton.transcode.core.ffmpeg.FFmpegConstants import scala.collection.JavaConversions._ @@ -41,14 +41,14 @@ trait TranscodingObserverApp { log.info("\n > Transcoder with id = {} started", msg.getTranscoderId()) val params = new scala.collection.mutable.HashMap[String, String] params += Constants.OUTPUT -> msg.getOutput() - messageSenderActor ! new StartTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params) + messageSenderActor ! new StartTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params.toMap) } def handleUpdateVideoTranscoderReply(msg: UpdateVideoTranscoderReply) = { log.info("\n > Transcoder with id = {} updated", msg.getTranscoderId()) val params = new scala.collection.mutable.HashMap[String, String] params += Constants.OUTPUT -> msg.getOutput() - messageSenderActor ! new UpdateTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params) + messageSenderActor ! new UpdateTranscoderReply(msg.getMeetingId(), msg.getTranscoderId(), params.toMap) } def handleDestroyVideoTranscoderReply(msg: DestroyVideoTranscoderReply) = { @@ -60,7 +60,7 @@ trait TranscodingObserverApp { log.info("\n > Transcoder with id = {} restarted", msg.getTranscoderId()) val params = new scala.collection.mutable.HashMap[String, String] params += Constants.OUTPUT -> msg.getOutput() - messageSenderActor ! new TranscoderStatusUpdate(msg.getMeetingId(), msg.getTranscoderId(), params) + messageSenderActor ! new TranscoderStatusUpdate(msg.getMeetingId(), msg.getTranscoderId(), params.toMap) } def handleStartVideoProbingReply(msg: StartVideoProbingReply) = { @@ -70,7 +70,7 @@ trait TranscodingObserverApp { val params = new scala.collection.mutable.HashMap[String, String] params += Constants.WIDTH_RATIO -> result.getOrElse(FFmpegConstants.WIDTH, "") params += Constants.HEIGHT_RATIO -> result.getOrElse(FFmpegConstants.HEIGHT, "") - messageSenderActor ! new StartProbingReply(msg.getMeetingId(), msg.getTranscoderId(), params) + messageSenderActor ! new StartProbingReply(msg.getMeetingId(), msg.getTranscoderId(), params.toMap) case _ => log.debug("Could not send ffprobe reply : failed to get the new resolution"); } } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java index 4cffed762966..e1c733f45973 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/Constants.java @@ -140,22 +140,4 @@ public class Constants { public static final String GUEST_POLICY = "guest_policy"; public static final String SET_BY = "set_by"; public static final String METADATA = "metadata"; - public static final String LOCAL_IP_ADDRESS = "local_ip_address"; - public static final String LOCAL_VIDEO_PORT = "local_video_port"; - public static final String REMOTE_VIDEO_PORT = "remote_video_port"; - public static final String DESTINATION_IP_ADDRESS = "destination_ip_address"; - public static final String SIP_HOST = "sip_host"; - public static final String TRANSCODER_TYPE = "transcoder_type"; - public static final String INPUT = "input"; - public static final String OUTPUT = "output"; - public static final String TRANSCODE_RTP_TO_RTMP = "transcode_rtp_to_rtmp"; - public static final String TRANSCODE_RTMP_TO_RTP = "transcode_rtmp_to_rtp"; - public static final String TRANSCODE_FILE_TO_RTP = "transcode_file_to_rtp"; - public static final String TRANSCODE_FILE_TO_RTMP = "transcode_file_to_rtmp"; - public static final String TRANSCODE_H264_TO_H263 = "transcode_h264_to_h263"; - public static final String TRANSCODE_ROTATE_RIGHT = "transcode_rotate_right"; - public static final String TRANSCODE_ROTATE_LEFT = "transcode_rotate_left"; - public static final String TRANSCODE_ROTATE_UPSIDE_DOWN = "transcode_rotate_upside_down"; - public static final String MODULE = "module"; - public static final String PROBE_RTMP = "probe_rtmp"; } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java deleted file mode 100644 index 44f4f3e23554..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingReplyMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StartProbingReplyMessage implements IBigBlueButtonMessage { - public static final String START_PROBING_REPLY = "start_probing_reply_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public StartProbingReplyMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(START_PROBING_REPLY, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StartProbingReplyMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (START_PROBING_REPLY.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new StartProbingReplyMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java deleted file mode 100644 index b9d906728319..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartProbingRequestMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StartProbingRequestMessage implements IBigBlueButtonMessage { - public static final String START_PROBING_REQUEST = "start_probing_request_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public StartProbingRequestMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(START_PROBING_REQUEST, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StartProbingRequestMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (START_PROBING_REQUEST.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new StartProbingRequestMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java deleted file mode 100644 index 33d71db1130d..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderReplyMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StartTranscoderReplyMessage implements IBigBlueButtonMessage { - public static final String START_TRANSCODER_REPLY = "start_transcoder_reply_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public StartTranscoderReplyMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(START_TRANSCODER_REPLY, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StartTranscoderReplyMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (START_TRANSCODER_REPLY.equals(messageName)) { - if ( payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new StartTranscoderReplyMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java deleted file mode 100644 index 728093dc2299..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StartTranscoderRequestMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StartTranscoderRequestMessage implements IBigBlueButtonMessage { - public static final String START_TRANSCODER_REQUEST = "start_transcoder_request_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public StartTranscoderRequestMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(START_TRANSCODER_REQUEST, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StartTranscoderRequestMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (START_TRANSCODER_REQUEST.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new StartTranscoderRequestMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopMeetingTranscodersMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopMeetingTranscodersMessage.java deleted file mode 100644 index c277ae4fd4f6..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopMeetingTranscodersMessage.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StopMeetingTranscodersMessage implements IBigBlueButtonMessage { - public static final String STOP_MEETING_TRANSCODERS = "stop_meeting_transcoders_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - - public final String meetingId; - - public StopMeetingTranscodersMessage(String meetingId) { - this.meetingId = meetingId; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - - java.util.HashMap header = MessageBuilder.buildHeader(STOP_MEETING_TRANSCODERS, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StopMeetingTranscodersMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (STOP_MEETING_TRANSCODERS.equals(messageName)) { - if (payload.has(MEETING_ID)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - return new StopMeetingTranscodersMessage(meetingId); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java deleted file mode 100644 index d8aee64bb5e4..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderReplyMessage.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StopTranscoderReplyMessage implements IBigBlueButtonMessage { - public static final String STOP_TRANSCODER_REPLY = "stop_transcoder_reply_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - - public final String meetingId; - public final String transcoderId; - - public StopTranscoderReplyMessage(String meetingId, String transcoderId) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - - java.util.HashMap header = MessageBuilder.buildHeader(STOP_TRANSCODER_REPLY, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StopTranscoderReplyMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (STOP_TRANSCODER_REPLY.equals(messageName)) { - if ( payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - return new StopTranscoderReplyMessage(meetingId, transcoderId); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java deleted file mode 100644 index 7ac74c17298a..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/StopTranscoderRequestMessage.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class StopTranscoderRequestMessage implements IBigBlueButtonMessage { - public static final String STOP_TRANSCODER_REQUEST = "stop_transcoder_request_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - - public final String meetingId; - public final String transcoderId; - - public StopTranscoderRequestMessage(String meetingId, String transcoderId) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - - java.util.HashMap header = MessageBuilder.buildHeader(STOP_TRANSCODER_REQUEST, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static StopTranscoderRequestMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (STOP_TRANSCODER_REQUEST.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - return new StopTranscoderRequestMessage(meetingId, transcoderId); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java deleted file mode 100644 index a48a85db1891..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/TranscoderStatusUpdateMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class TranscoderStatusUpdateMessage implements IBigBlueButtonMessage { - public static final String TRANSCODER_STATUS_UPDATE = "transcoder_status_update"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public TranscoderStatusUpdateMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(TRANSCODER_STATUS_UPDATE, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static TranscoderStatusUpdateMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (TRANSCODER_STATUS_UPDATE.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new TranscoderStatusUpdateMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java deleted file mode 100644 index a8c8ce533848..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderReplyMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class UpdateTranscoderReplyMessage implements IBigBlueButtonMessage { - public static final String UPDATE_TRANSCODER_REPLY = "update_transcoder_reply_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public UpdateTranscoderReplyMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REPLY, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static UpdateTranscoderReplyMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (UPDATE_TRANSCODER_REPLY.equals(messageName)) { - if ( payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new UpdateTranscoderReplyMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java b/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java deleted file mode 100644 index 3fd6904e6a32..000000000000 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common/messages/payload/UpdateTranscoderRequestMessage.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.bigbluebutton.common.messages; - -import java.util.Map; -import java.util.HashMap; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; - - -public class UpdateTranscoderRequestMessage implements IBigBlueButtonMessage { - public static final String UPDATE_TRANSCODER_REQUEST = "update_transcoder_request_message"; - public static final String VERSION = "0.0.1"; - - public static final String MEETING_ID = "meeting_id"; - public static final String TRANSCODER_ID = "transcoder_id"; - public static final String PARAMS = "params"; - - public final String meetingId; - public final String transcoderId; - public final Map params; - - public UpdateTranscoderRequestMessage(String meetingId, String transcoderId, Map params) { - this.meetingId = meetingId; - this.transcoderId = transcoderId; - this.params = params; - } - - public String toJson() { - HashMap payload = new HashMap(); - payload.put(MEETING_ID, meetingId); - payload.put(TRANSCODER_ID, transcoderId); - payload.put(PARAMS, params); - - java.util.HashMap header = MessageBuilder.buildHeader(UPDATE_TRANSCODER_REQUEST, VERSION, null); - - return MessageBuilder.buildJson(header, payload); - } - - public static UpdateTranscoderRequestMessage fromJson(String message) { - JsonParser parser = new JsonParser(); - JsonObject obj = (JsonObject) parser.parse(message); - - if (obj.has("header") && obj.has("payload")) { - JsonObject header = (JsonObject) obj.get("header"); - JsonObject payload = (JsonObject) obj.get("payload"); - - if (header.has("name")) { - String messageName = header.get("name").getAsString(); - if (UPDATE_TRANSCODER_REQUEST.equals(messageName)) { - if (payload.has(MEETING_ID) - && payload.has(TRANSCODER_ID) - && payload.has(PARAMS)){ - String meetingId = payload.get(MEETING_ID).getAsString(); - String transcoderId = payload.get(TRANSCODER_ID).getAsString(); - Map params = new Gson().fromJson(payload.get(PARAMS).toString(), new TypeToken>() {}.getType()); - return new UpdateTranscoderRequestMessage(meetingId, transcoderId, params); - } - } - } - } - return null; - } -} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/TranscoderMsgs.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/TranscoderMsgs.scala new file mode 100644 index 000000000000..c3893918ab3c --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/TranscoderMsgs.scala @@ -0,0 +1,43 @@ +package org.bigbluebutton.common2.msgs + +/* In Messages */ +object StartProbingSysReqMsg {val NAME = "StartProbingSysReqMsg"} +case class StartProbingSysReqMsg(header: BbbCoreHeaderWithMeetingId, body: StartProbingSysReqMsgBody) extends BbbCoreMsg +case class StartProbingSysReqMsgBody(transcoderId: String, params: Map[String, String]) + +object StartTranscoderSysReqMsg {val NAME = "StartTranscoderSysReqMsg"} +case class StartTranscoderSysReqMsg(header: BbbCoreHeaderWithMeetingId, body: StartTranscoderSysReqMsgBody) extends BbbCoreMsg +case class StartTranscoderSysReqMsgBody(transcoderId: String, params: Map[String, String]) + +object StopTranscoderSysReqMsg {val NAME = "StopTranscoderSysReqMsg"} +case class StopTranscoderSysReqMsg(header: BbbCoreHeaderWithMeetingId, body: StopTranscoderSysReqMsgBody) extends BbbCoreMsg +case class StopTranscoderSysReqMsgBody(transcoderId: String) + +object UpdateTranscoderSysReqMsg {val NAME = "UpdateTranscoderSysReqMsg"} +case class UpdateTranscoderSysReqMsg(header: BbbCoreHeaderWithMeetingId, body: UpdateTranscoderSysReqMsgBody) extends BbbCoreMsg +case class UpdateTranscoderSysReqMsgBody(transcoderId: String, params: Map[String, String]) + +object TranscoderStatusUpdateSysCmdMsg {val NAME = "TranscoderStatusUpdateSysCmdMsg"} +case class TranscoderStatusUpdateSysCmdMsg(header: BbbCoreHeaderWithMeetingId, body: TranscoderStatusUpdateSysCmdMsgBody) extends BbbCoreMsg +case class TranscoderStatusUpdateSysCmdMsgBody(transcoderId: String, params: Map[String, String]) + +object StopMeetingTranscodersSysCmdMsg {val NAME = "StopMeetingTranscodersSysCmdMsg"} +case class StopMeetingTranscodersSysCmdMsg(header: BbbCoreHeaderWithMeetingId, body: StopMeetingTranscodersSysCmdMsgBody) extends BbbCoreMsg +case class StopMeetingTranscodersSysCmdMsgBody() + +/* Out Messages */ +object StartProbingSysRespMsg {val NAME = "StartProbingSysRespMsg"} +case class StartProbingSysRespMsg(header: BbbCoreHeaderWithMeetingId, body: StartProbingSysRespMsgBody) extends BbbCoreMsg +case class StartProbingSysRespMsgBody(transcoderId: String, params: Map[String, String]) + +object StartTranscoderSysRespMsg {val NAME = "StartTranscoderSysRespMsg"} +case class StartTranscoderSysRespMsg(header: BbbCoreHeaderWithMeetingId, body: StartTranscoderSysRespMsgBody) extends BbbCoreMsg +case class StartTranscoderSysRespMsgBody(transcoderId: String, params: Map[String, String]) + +object StopTranscoderSysRespMsg {val NAME = "StopTranscoderSysRespMsg"} +case class StopTranscoderSysRespMsg(header: BbbCoreHeaderWithMeetingId, body: StopTranscoderSysRespMsgBody) extends BbbCoreMsg +case class StopTranscoderSysRespMsgBody(transcoderId: String) + +object UpdateTranscoderSysRespMsg {val NAME = "UpdateTranscoderSysRespMsg"} +case class UpdateTranscoderSysRespMsg(header: BbbCoreHeaderWithMeetingId, body: UpdateTranscoderSysRespMsgBody) extends BbbCoreMsg +case class UpdateTranscoderSysRespMsgBody(transcoderId: String, params: Map[String, String]) \ No newline at end of file diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/Constants.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/Constants.java new file mode 100644 index 000000000000..1d1841358b52 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/Constants.java @@ -0,0 +1,34 @@ +package org.bigbluebutton.red5.pubsub; + +public class Constants { + public static final String LOCAL_IP_ADDRESS = "local_ip_address"; + public static final String LOCAL_VIDEO_PORT = "local_video_port"; + public static final String REMOTE_VIDEO_PORT = "remote_video_port"; + public static final String DESTINATION_IP_ADDRESS = "destination_ip_address"; + public static final String SIP_HOST = "sip_host"; + public static final String TRANSCODER_TYPE = "transcoder_type"; + public static final String INPUT = "input"; + public static final String OUTPUT = "output"; + public static final String TRANSCODE_RTP_TO_RTMP = "transcode_rtp_to_rtmp"; + public static final String TRANSCODE_RTMP_TO_RTP = "transcode_rtmp_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTP = "transcode_file_to_rtp"; + public static final String TRANSCODE_FILE_TO_RTMP = "transcode_file_to_rtmp"; + public static final String TRANSCODE_H264_TO_H263 = "transcode_h264_to_h263"; + public static final String TRANSCODE_ROTATE_RIGHT = "transcode_rotate_right"; + public static final String TRANSCODE_ROTATE_LEFT = "transcode_rotate_left"; + public static final String TRANSCODE_ROTATE_UPSIDE_DOWN = "transcode_rotate_upside_down"; + public static final String MODULE = "module"; + public static final String PROBE_RTMP = "probe_rtmp"; + + public static final String HEADER = "header"; + public static final String CORE = "core"; + public static final String BODY = "body"; + public static final String ENVELOPE = "envelope"; + public static final String ROUTING = "routing"; + public static final String NAME = "name"; + public static final String SENDER = "sender"; + public static final String PARAMS = "params"; + public static final String TRANSCODER_ID = "transcoderId"; + public static final String MEETING_ID = "meetingId"; + public static final String VIDEO = "video"; +} \ No newline at end of file diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java index ae4c66badb90..5b6de28e9828 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java @@ -1,15 +1,14 @@ package org.bigbluebutton.red5.pubsub; import org.bigbluebutton.common.messages.MessagingConstants; -import org.bigbluebutton.common.messages.Constants; import org.bigbluebutton.common.messages.UserSharedWebcamMessage; import org.bigbluebutton.common.messages.UserUnshareWebcamRequestMessage; -import org.bigbluebutton.common.messages.StartTranscoderRequestMessage; -import org.bigbluebutton.common.messages.StopTranscoderRequestMessage; import java.util.HashMap; import java.util.Map; +import com.google.gson.Gson; + public class MessagePublisher { private MessageSender sender; @@ -29,54 +28,90 @@ public void userUnshareWebcamRequestMessage(String meetingId, String userId, Str sender.send(MessagingConstants.TO_USERS_CHANNEL, msg.toJson()); } - public void startRotateLeftTranscoderRequest(String meetingId, String userId, String streamName, String ipAddress) { + public void startRotateLeftTranscoderRequest(String meetingId, String transcoderId, String streamName, String ipAddress) { Map params = new HashMap(); params.put(Constants.TRANSCODER_TYPE, Constants.TRANSCODE_ROTATE_LEFT); params.put(Constants.LOCAL_IP_ADDRESS, ipAddress); params.put(Constants.DESTINATION_IP_ADDRESS, ipAddress); params.put(Constants.INPUT, streamName); - // TODO: transcoderId is getting userId, this may have to change - StartTranscoderRequestMessage msg = new StartTranscoderRequestMessage(meetingId, userId, params); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + Map body = new HashMap(); + body.put(Constants.TRANSCODER_ID, transcoderId); + body.put(Constants.PARAMS, params); + String msg = buildJson("StartTranscoderSysReqMsg", meetingId, body); + sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg); } - public void startRotateRightTranscoderRequest(String meetingId, String userId, String streamName, String ipAddress) { + public void startRotateRightTranscoderRequest(String meetingId, String transcoderId, String streamName, String ipAddress) { Map params = new HashMap(); params.put(Constants.TRANSCODER_TYPE, Constants.TRANSCODE_ROTATE_RIGHT); params.put(Constants.LOCAL_IP_ADDRESS, ipAddress); params.put(Constants.DESTINATION_IP_ADDRESS, ipAddress); params.put(Constants.INPUT, streamName); - // TODO: transcoderId is getting userId, this may have to change - StartTranscoderRequestMessage msg = new StartTranscoderRequestMessage(meetingId, userId, params); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + Map body = new HashMap(); + body.put(Constants.TRANSCODER_ID, transcoderId); + body.put(Constants.PARAMS, params); + String msg = buildJson("StartTranscoderSysReqMsg", meetingId, body); + sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg); } - public void startRotateUpsideDownTranscoderRequest(String meetingId, String userId, String streamName, String ipAddress) { + public void startRotateUpsideDownTranscoderRequest(String meetingId, String transcoderId, String streamName, String ipAddress) { Map params = new HashMap(); params.put(Constants.TRANSCODER_TYPE, Constants.TRANSCODE_ROTATE_UPSIDE_DOWN); params.put(Constants.LOCAL_IP_ADDRESS, ipAddress); params.put(Constants.DESTINATION_IP_ADDRESS, ipAddress); params.put(Constants.INPUT, streamName); - // TODO: transcoderId is getting userId, this may have to change - StartTranscoderRequestMessage msg = new StartTranscoderRequestMessage(meetingId, userId, params); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + Map body = new HashMap(); + body.put(Constants.TRANSCODER_ID, transcoderId); + body.put(Constants.PARAMS, params); + String msg = buildJson("StartTranscoderSysReqMsg", meetingId, body); + sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg); } - public void startH264ToH263TranscoderRequest(String meetingId, String userId, String streamName, String ipAddress) { + public void startH264ToH263TranscoderRequest(String meetingId, String transcoderId, String streamName, String ipAddress) { Map params = new HashMap(); params.put(Constants.TRANSCODER_TYPE, Constants.TRANSCODE_H264_TO_H263); - params.put(Constants.MODULE, "video"); + params.put(Constants.MODULE, Constants.VIDEO); params.put(Constants.LOCAL_IP_ADDRESS, ipAddress); params.put(Constants.DESTINATION_IP_ADDRESS, ipAddress); params.put(Constants.INPUT, streamName); - // TODO: transcoderId is getting userId, this may have to change - StartTranscoderRequestMessage msg = new StartTranscoderRequestMessage(meetingId, userId, params); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + Map body = new HashMap(); + body.put(Constants.TRANSCODER_ID, transcoderId); + body.put(Constants.PARAMS, params); + String msg = buildJson("StartTranscoderSysReqMsg", meetingId, body); + sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg); + } + + public void stopTranscoderRequest(String meetingId, String transcoderId) { + Map body = new HashMap(); + body.put(Constants.TRANSCODER_ID, transcoderId); + String msg = buildJson("StopTranscoderSysReqMsg", meetingId, body); + sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg); + } + + public String buildJson(String name, String meetingId, Map body) { + Map message = new HashMap(); + message.put(Constants.ENVELOPE, buildEnvelope(name)); + Map core = new HashMap(); + core.put(Constants.HEADER, buildHeader(name, meetingId)); + core.put(Constants.BODY, body); + message.put(Constants.CORE, core); + Gson gson = new Gson(); + return gson.toJson(message); + } + + private Map buildEnvelope(String name) { + Map envelope = new HashMap(); + envelope.put(Constants.NAME, name); + Map routing = new HashMap(); + routing.put(Constants.SENDER, "bbb-video"); + envelope.put(Constants.ROUTING, routing); + return envelope; } - public void stopTranscoderRequest(String meetingId, String userId) { - // TODO: transcoderId is getting userId, this may have to change - StopTranscoderRequestMessage msg = new StopTranscoderRequestMessage(meetingId, userId); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + private Map buildHeader(String name, String meetingId) { + Map header = new HashMap(); + header.put(Constants.NAME, name); + header.put(Constants.MEETING_ID, meetingId); + return header; } -} +} \ No newline at end of file diff --git a/deskshare/app/src/main/java/org/bigbluebutton/deskshare/server/red5/pubsub/MessagePublisher.java b/deskshare/app/src/main/java/org/bigbluebutton/deskshare/server/red5/pubsub/MessagePublisher.java index a7da742c76f9..cfb630c60c95 100644 --- a/deskshare/app/src/main/java/org/bigbluebutton/deskshare/server/red5/pubsub/MessagePublisher.java +++ b/deskshare/app/src/main/java/org/bigbluebutton/deskshare/server/red5/pubsub/MessagePublisher.java @@ -21,10 +21,6 @@ import java.util.Map; import java.util.HashMap; -import org.bigbluebutton.common.messages.MessagingConstants; -import org.bigbluebutton.common.messages.Constants; -import org.bigbluebutton.common.messages.StartTranscoderRequestMessage; -import org.bigbluebutton.common.messages.StopTranscoderRequestMessage; import org.bigbluebutton.deskshare.server.red5.pubsub.MessageSender; public class MessagePublisher { @@ -36,20 +32,10 @@ public void setMessageSender(MessageSender sender) { } public void startH264ToH263TranscoderRequest(String meetingId, String streamName, String ipAddress) { - Map params = new HashMap(); - params.put(Constants.TRANSCODER_TYPE, Constants.TRANSCODE_H264_TO_H263); - params.put(Constants.MODULE, "deskShare"); - params.put(Constants.LOCAL_IP_ADDRESS, ipAddress); - params.put(Constants.DESTINATION_IP_ADDRESS, ipAddress); - params.put(Constants.INPUT, streamName); - // TODO: transcoderId is getting meetingId, this may have to change - StartTranscoderRequestMessage msg = new StartTranscoderRequestMessage(meetingId, meetingId, params); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + // NOT USED ANYMORE } public void stopTranscoderRequest(String meetingId) { - // TODO: transcoderId is getting meetingId, this may have to change - StopTranscoderRequestMessage msg = new StopTranscoderRequestMessage(meetingId, meetingId); - sender.send(MessagingConstants.TO_BBB_TRANSCODE_SYSTEM_CHAN, msg.toJson()); + // NOT USED ANYMORE } } diff --git a/labs/kurento-screenshare/lib/bbb/messages/Messaging.js b/labs/kurento-screenshare/lib/bbb/messages/Messaging.js index 2febbd22cc1f..adee015ba93c 100644 --- a/labs/kurento-screenshare/lib/bbb/messages/Messaging.js +++ b/labs/kurento-screenshare/lib/bbb/messages/Messaging.js @@ -8,6 +8,10 @@ var StartTranscoderRequestMessage = require('./transcode/StartTranscoderRequestMessage.js')(Constants); var StopTranscoderRequestMessage = require('./transcode/StopTranscoderRequestMessage.js')(Constants); +var StartTranscoderSysReqMsg = + require('./transcode/StartTranscoderSysReqMsg.js')(); +var StopTranscoderSysReqMsg = + require('./transcode/StopTranscoderSysReqMsg.js')(); var DeskShareRTMPBroadcastStartedEventMessage = require('./screenshare/DeskShareRTMPBroadcastStartedEventMessage.js')(Constants); var DeskShareRTMPBroadcastStoppedEventMessage = @@ -27,13 +31,13 @@ function Messaging() {} Messaging.prototype.generateStartTranscoderRequestMessage = function(meetingId, transcoderId, params) { - var statrm = new StartTranscoderRequestMessage(meetingId, transcoderId, params); + var statrm = new StartTranscoderSysReqMsg(meetingId, transcoderId, params); return statrm.toJson(); } Messaging.prototype.generateStopTranscoderRequestMessage = function(meetingId, transcoderId) { - var stotrm = new StopTranscoderRequestMessage(meetingId, transcoderId); + var stotrm = new StopTranscoderSysReqMsg(meetingId, transcoderId); return stotrm.toJson(); } diff --git a/labs/kurento-screenshare/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js b/labs/kurento-screenshare/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js new file mode 100644 index 000000000000..6cbffedcac80 --- /dev/null +++ b/labs/kurento-screenshare/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js @@ -0,0 +1,17 @@ +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); + +module.exports = function() { + function StartTranscoderSysReqMsg(meetingId, transcoderId, params) { + StartTranscoderSysReqMsg.super_.call(this, "StartTranscoderSysReqMsg", + {sender: "kurento-screenshare"}, + {meetingId: meetingId}); + + this.core.body = {}; + this.core.body["transcoderId"] = transcoderId; + this.core.body["params"] = params; + }; + + inherits(StartTranscoderSysReqMsg, OutMessage2x); + return StartTranscoderSysReqMsg; +} \ No newline at end of file diff --git a/labs/kurento-screenshare/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js b/labs/kurento-screenshare/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js new file mode 100644 index 000000000000..543ace575367 --- /dev/null +++ b/labs/kurento-screenshare/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js @@ -0,0 +1,16 @@ +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); + +module.exports = function() { + function StopTranscoderSysReqMsg(meetingId, transcoderId) { + StopTranscoderSysReqMsg.super_.call(this, "StopTranscoderSysReqMsg", + {sender: "kurento-screenshare"}, + {meetingId: meetingId}); + + this.core.body = {}; + this.core.body["transcoderId"] = transcoderId; + }; + + inherits(StopTranscoderSysReqMsg, OutMessage2x); + return StopTranscoderSysReqMsg; +} \ No newline at end of file