Skip to content

Commit

Permalink
bump: Scala 2.13.8 (drop 2.12), Akka 2.6.18 (akka#2777)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Jan 18, 2022
1 parent 4132ac9 commit 49ec3aa
Show file tree
Hide file tree
Showing 228 changed files with 695 additions and 758 deletions.
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ jobs:
env: CMD="verifyCodeStyle; mimaReportBinaryIssues"
name: "Code style check and MiMa. Run locally with: sbt verifyCodeStyle; mimaReportBinaryIssues"
if: type != cron
- env: CMD="++2.12.11 Test/compile"
name: "Compile all code with Scala 2.12 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.12.11 Test/compile"
- env: CMD="++2.13.3 Test/compile"
name: "Compile all code with Scala 2.13"
- env: CMD="++2.13.8 Test/compile"
name: "Compile all code with Scala 2.13 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.13.8 Test/compile"
- env: CMD="unidoc; docs/paradox"
name: "Create all API docs and create site with Paradox"

Expand Down Expand Up @@ -150,7 +148,7 @@ jobs:
- stage: licenses
script: echo "License checking is temporarily disabled"

- name: "Publish artifacts for Scala 2.12 and 2.13"
- name: "Publish artifacts for Scala 2.13"
env: CMD="ci-release"
script: openssl aes-256-cbc -K $encrypted_74014e1c3c6a_key -iv $encrypted_74014e1c3c6a_iv -in .travis/travis_gpg_secret.enc -out .travis/travis_gpg_secret.gpg -d && export PGP_SECRET=$(cat .travis/travis_gpg_secret.gpg) && ./scripts/travis.sh
- script: openssl aes-256-cbc -K $encrypted_bbf1dc4f2a07_key -iv $encrypted_bbf1dc4f2a07_iv -in .travis/travis_alpakka_rsa.enc -out .travis/id_rsa -d && eval "$(ssh-agent -s)" && chmod 600 .travis/id_rsa && ssh-add .travis/id_rsa && sbt -jvm-opts .jvmopts-travis docs/publishRsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.{SSLContext, TrustManager}

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Only for internal implementations
Expand Down Expand Up @@ -121,7 +121,7 @@ final class AmqpDetailsConnectionProvider private (
copy(connectionName = Option(name))

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val factory = new ConnectionFactory
credentials.foreach { credentials =>
factory.setUsername(credentials.username)
Expand Down Expand Up @@ -331,7 +331,7 @@ final class AmqpConnectionFactoryConnectionProvider private (val factory: Connec
copy(hostAndPorts = hostAndPorts.asScala.map(_.toScala).toIndexedSeq)

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
factory.newConnection(hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.amqp
import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.immutable
import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private trait AmqpConnectorLogic { this: GraphStageLogic =>
connection.addShutdownListener(shutdownListener)
channel.addShutdownListener(shutdownListener)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

settings.declarations.foreach {
case d: QueueDeclaration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private var unackedMessages = 0

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
channel.basicQos(bufferSize)
val consumerCallback = getAsyncCallback(handleDelivery)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AmqpConnectorsSpec extends AmqpSpec {

val input = Vector("one", "two", "three", "four", "five")
val (rpcQueueF, probe) =
Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run
Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
rpcQueueF.futureValue

val amqpSink = AmqpSink.replyTo(
Expand Down Expand Up @@ -215,8 +215,8 @@ class AmqpConnectorsSpec extends AmqpSpec {

val publisher = TestPublisher.probe[ByteString]()
val subscriber = TestSubscriber.probe[ReadResult]()
amqpSink.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Source.fromPublisher(publisher))
amqpSource.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Sink.fromSubscriber(subscriber))
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1, 1)).run()
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1, 1)).run()

// note that this essentially is testing rabbitmq just as much as it tests our sink and source
publisher.ensureSubscription()
Expand Down Expand Up @@ -335,7 +335,7 @@ class AmqpConnectorsSpec extends AmqpSpec {
.viaMat(amqpRpcFlow)(Keep.right)
.mapAsync(1)(cm => cm.ack().map(_ => cm.message))
.toMat(TestSink.probe)(Keep.both)
.run
.run()
rpcQueueF.futureValue

val amqpSink = AmqpSink.replyTo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.viaMat(mockedFlowWithContextAndConfirm)(Keep.right)
.asSource
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -239,7 +239,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.viaMat(mockedUnorderedFlowWithPassThrough)(Keep.right)
.asSource
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -279,7 +279,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -297,7 +297,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -314,7 +314,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => (WriteMessage(ByteString(s)), s))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand Down Expand Up @@ -352,7 +352,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -380,7 +380,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -399,7 +399,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -438,7 +438,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.right)
.run
.run()

probe.request(sourceElements)

Expand All @@ -458,7 +458,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.left)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

sinkProbe.request(input.size)
input.foreach(sourceProbe.sendNext)
Expand Down
4 changes: 2 additions & 2 deletions amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class AmqpDocsSpec extends AmqpSpec {
.map(s => ByteString(s))
.viaMat(amqpRpcFlow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()
//#create-rpc-flow
rpcQueueF.futureValue

Expand Down Expand Up @@ -153,7 +153,7 @@ class AmqpDocsSpec extends AmqpSpec {
}
//#create-exchange-source

val completion = Promise[Done]
val completion = Promise[Done]()
val mergingFlow = mergedSources
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.fold(Set.empty[Int]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
import org.scalacheck.Gen
import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.language.higherKinds
import scala.reflect.io.Directory
import scala.util.Random

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class AvroParquetSinkSpec
}

"create new parquet file from any subtype of `GenericRecord` " in assertAllStagesStopped {
import scala.language.higherKinds

//given
val n: Int = 3
val file: String = genFinalFile.sample.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.collection.mutable.Queue
retrieveMessages()

def retrieveMessages(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val res = cloudQueueBuilt
.retrieveMessages(settings.batchSize, settings.initialVisibilityTimeout, null, null)
.asScala
Expand All @@ -51,7 +51,7 @@ import scala.collection.mutable.Queue
}
} else {
buffer ++= res
push(out, buffer.dequeue)
push(out, buffer.dequeue())
}
}

Expand All @@ -60,7 +60,7 @@ import scala.collection.mutable.Queue
new OutHandler {
override def onPull: Unit =
if (!buffer.isEmpty) {
push(out, buffer.dequeue)
push(out, buffer.dequeue())
} else {
retrieveMessages()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
import org.scalatest._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.Properties
Expand Down Expand Up @@ -45,13 +45,13 @@ class AzureQueueSpec extends TestKit(ActorSystem()) with AsyncFlatSpecLike with
test()
}

override def beforeAll: Unit =
override def beforeAll(): Unit =
queueOpt.map(_.createIfNotExists)

override def afterAll: Unit = {
override def afterAll(): Unit = {
queueOpt.map(_.deleteIfExists)
TestKit.shutdownActorSystem(system)
super.afterAll
super.afterAll()
}

private var testMsgCount = 0
Expand Down
48 changes: 24 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ lazy val alpakka = project
mqttStreamingBench,
// googleCloudPubSubGrpc and googleCloudBigQueryStorage contain the same gRPC generated classes
// don't include ScalaDocs for googleCloudBigQueryStorage to make it work
googleCloudBigQueryStorage
googleCloudBigQueryStorage,
// springWeb triggers an esoteric ScalaDoc bug (from Java code)
springWeb
),
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)
Expand Down Expand Up @@ -164,28 +166,23 @@ lazy val geode =
"geode",
Dependencies.Geode,
Test / fork := true,
Compile / unmanagedSourceDirectories ++= {
val sourceDir = (Compile / sourceDirectory).value
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n >= 12 => Seq(sourceDir / "scala-2.12+")
case _ => Seq.empty
}
}
// https://github.com/scala/bug/issues/12072
Test / scalacOptions += "-Xlint:-byname-implicit"
)

lazy val googleCommon = alpakkaProject(
"google-common",
"google.common",
Dependencies.GoogleCommon,
Test / fork := true,
fatalWarnings := true
Test / fork := true
)

lazy val googleCloudBigQuery = alpakkaProject(
"google-cloud-bigquery",
"google.cloud.bigquery",
Dependencies.GoogleBigQuery,
Test / fork := true
Test / fork := true,
Compile / scalacOptions += "-Wconf:src=src_managed/.+:s"
).dependsOn(googleCommon).enablePlugins(spray.boilerplate.BoilerplatePlugin)

lazy val googleCloudBigQueryStorage = alpakkaProject(
Expand All @@ -198,8 +195,8 @@ lazy val googleCloudBigQueryStorage = alpakkaProject(
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Server),
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala, AkkaGrpc.Java),
Compile / scalacOptions ++= Seq(
"-P:silencer:pathFilters=akka-grpc/main",
"-P:silencer:pathFilters=akka-grpc/test"
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
).dependsOn(googleCommon).enablePlugins(AkkaGrpcPlugin)
Expand All @@ -224,8 +221,8 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
// for the ExampleApp in the tests
run / connectInput := true,
Compile / scalacOptions ++= Seq(
"-P:silencer:pathFilters=akka-grpc/main",
"-P:silencer:pathFilters=akka-grpc/test"
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
).enablePlugins(AkkaGrpcPlugin).dependsOn(googleCommon)
Expand All @@ -247,7 +244,15 @@ lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs)
lazy val huaweiPushKit =
alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit)

lazy val influxdb = alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB)
lazy val influxdb = alpakkaProject(
"influxdb",
"influxdb",
Dependencies.InfluxDB,
Compile / scalacOptions ++= Seq(
// JDK 11: method isAccessible in class AccessibleObject is deprecated
"-Wconf:cat=deprecation:s"
)
)

lazy val ironmq = alpakkaProject(
"ironmq",
Expand Down Expand Up @@ -420,10 +425,7 @@ lazy val `doc-examples` = project
.settings(
name := s"akka-stream-alpakka-doc-examples",
publish / skip := true,
// More projects are not available for Scala 2.13
crossScalaVersions -= Dependencies.Scala213,
Dependencies.`Doc-examples`,
fatalWarnings := true
Dependencies.`Doc-examples`
)

def alpakkaProject(projectId: String, moduleName: String, additionalSettings: sbt.Def.SettingsDefinition*): Project = {
Expand All @@ -439,8 +441,7 @@ def alpakkaProject(projectId: String, moduleName: String, additionalSettings: sb
.getOrElse(throw new Error("Unable to determine previous version"))
),
mimaBinaryIssueFilters += ProblemFilters.exclude[Problem]("*.impl.*"),
Test / parallelExecution := false,
fatalWarnings := true
Test / parallelExecution := false
)
.settings(additionalSettings: _*)
.dependsOn(testkit % Test)
Expand All @@ -452,8 +453,7 @@ def internalProject(projectId: String, additionalSettings: sbt.Def.SettingsDefin
.disablePlugins(SitePlugin, MimaPlugin)
.settings(
name := s"akka-stream-alpakka-$projectId",
publish / skip := true,
fatalWarnings := true
publish / skip := true
)
.settings(additionalSettings: _*)

Expand Down
Loading

0 comments on commit 49ec3aa

Please sign in to comment.