Skip to content

Commit

Permalink
zio 2.0.0 (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray authored Jul 3, 2022
1 parent 2589262 commit 79b7524
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 165 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ inThisBuild(
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "2.0.0-RC6"
val zioVersion = "2.0.0"
val awsVersion = "2.16.61"

lazy val `zio-s3` = project
Expand All @@ -35,8 +35,8 @@ lazy val `zio-s3` = project
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-nio" % "2.0.0-RC7",
"dev.zio" %% "zio-interop-reactivestreams" % "2.0.0-RC7",
"dev.zio" %% "zio-nio" % "2.0.0",
"dev.zio" %% "zio-interop-reactivestreams" % "2.0.0",
"software.amazon.awssdk" % "s3" % awsVersion,
"software.amazon.awssdk" % "sts" % awsVersion,
"dev.zio" %% "zio-test" % zioVersion % Test,
Expand Down
321 changes: 160 additions & 161 deletions src/main/scala/zio/s3/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,173 +46,172 @@ object Test {
.build()
.asInstanceOf[S3Exception]

def connect(path: ZPath): S3 = {
def connect(path: ZPath): UIO[S3] = {
type ContentType = String
type Metadata = Map[String, String]

new S3 {
private val refDb: Ref[Map[String, (ContentType, Metadata)]] =
Ref.unsafeMake(Map.empty[String, (ContentType, Metadata)])

override def createBucket(bucketName: String): IO[S3Exception, Unit] =
Files.createDirectory(path / bucketName).orDie

override def deleteBucket(bucketName: String): IO[S3Exception, Unit] =
Files.delete(path / bucketName).orDie

override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] =
Files.exists(path / bucketName)

override val listBuckets: IO[S3Exception, S3BucketListing] =
Files
.list(path)
.filterZIO(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory))
.mapZIO { p =>
Files
.readAttributes[BasicFileAttributes](p)
.map(attr => S3Bucket(p.filename.toString, attr.creationTime().toInstant))
}
.runCollect
.orDie

override def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] =
Files.deleteIfExists(path / bucketName / key).orDie.unit

override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] =
ZStream
.scoped(ZIO.fromAutoCloseable(ZIO.attempt(new FileInputStream((path / bucketName / key).toFile))))
.flatMap(ZStream.fromInputStream(_, 2048))
.refineOrDie {
case e: FileNotFoundException => fileNotFound(e)
}

override def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] =
(for {
res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String]))
(contentType, metadata) = res
file <- Files
.readAttributes[BasicFileAttributes](path / bucketName / key)
.map(p => ObjectMetadata(metadata, contentType, p.size()))
} yield file).orDie

override def listObjects(
bucketName: String,
options: ListObjectOptions
): IO[S3Exception, S3ObjectListing] =
Files
.find(path / bucketName) {
case (p, _) if options.delimiter.nonEmpty =>
options.prefix.fold(true)((path / bucketName).relativize(p).toString().startsWith)
case (p, _) =>
options.prefix.fold(true)(p.filename.toString().startsWith)
}
.mapZIO(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p))
.filter { case (attr, _) => attr.isRegularFile }
.map {
case (attr, f) =>
S3ObjectSummary(
bucketName,
(path / bucketName).relativize(f).toString(),
attr.lastModifiedTime().toInstant,
attr.size()
)
}
.runCollect
.map(
_.sortBy(_.key)
.mapAccum(options.starAfter) {
case (Some(startWith), o) =>
if (startWith.startsWith(o.key))
None -> Chunk.empty
else
Some(startWith) -> Chunk.empty
case (_, o) =>
None -> Chunk(o)
}
._2
.flatten
)
.map {
case list if list.size > options.maxKeys =>
S3ObjectListing(
bucketName,
options.delimiter,
options.starAfter,
list.take(options.maxKeys.toInt),
Some(UUID.randomUUID().toString),
None
)
case list =>
S3ObjectListing(bucketName, options.delimiter, options.starAfter, list, None, None)
Ref.make(Map.empty[String, (ContentType, Metadata)]).map { refDb =>
new S3 {
override def createBucket(bucketName: String): IO[S3Exception, Unit] =
Files.createDirectory(path / bucketName).orDie

override def deleteBucket(bucketName: String): IO[S3Exception, Unit] =
Files.delete(path / bucketName).orDie

override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] =
Files.exists(path / bucketName)

override val listBuckets: IO[S3Exception, S3BucketListing] =
Files
.list(path)
.filterZIO(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory))
.mapZIO { p =>
Files
.readAttributes[BasicFileAttributes](p)
.map(attr => S3Bucket(p.filename.toString, attr.creationTime().toInstant))
}
.runCollect
.orDie

override def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] =
Files.deleteIfExists(path / bucketName / key).orDie.unit

override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] =
ZStream
.scoped(ZIO.fromAutoCloseable(ZIO.attempt(new FileInputStream((path / bucketName / key).toFile))))
.flatMap(ZStream.fromInputStream(_, 2048))
.refineOrDie {
case e: FileNotFoundException => fileNotFound(e)
}

override def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] =
(for {
res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String]))
(contentType, metadata) = res
file <- Files
.readAttributes[BasicFileAttributes](path / bucketName / key)
.map(p => ObjectMetadata(metadata, contentType, p.size()))
} yield file).orDie

override def listObjects(
bucketName: String,
options: ListObjectOptions
): IO[S3Exception, S3ObjectListing] =
Files
.find(path / bucketName) {
case (p, _) if options.delimiter.nonEmpty =>
options.prefix.fold(true)((path / bucketName).relativize(p).toString().startsWith)
case (p, _) =>
options.prefix.fold(true)(p.filename.toString().startsWith)
}
.mapZIO(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p))
.filter { case (attr, _) => attr.isRegularFile }
.map {
case (attr, f) =>
S3ObjectSummary(
bucketName,
(path / bucketName).relativize(f).toString(),
attr.lastModifiedTime().toInstant,
attr.size()
)
}
.runCollect
.map(
_.sortBy(_.key)
.mapAccum(options.starAfter) {
case (Some(startWith), o) =>
if (startWith.startsWith(o.key))
None -> Chunk.empty
else
Some(startWith) -> Chunk.empty
case (_, o) =>
None -> Chunk(o)
}
._2
.flatten
)
.map {
case list if list.size > options.maxKeys =>
S3ObjectListing(
bucketName,
options.delimiter,
options.starAfter,
list.take(options.maxKeys.toInt),
Some(UUID.randomUUID().toString),
None
)
case list =>
S3ObjectListing(bucketName, options.delimiter, options.starAfter, list, None, None)
}
.orDie

override def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] =
listing.nextContinuationToken match {
case Some(token) if token.nonEmpty => listObjects(listing.bucketName, ListObjectOptions.fromMaxKeys(100))
case _ => ZIO.dieMessage("Empty token is invalid")
}
.orDie

override def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] =
listing.nextContinuationToken match {
case Some(token) if token.nonEmpty => listObjects(listing.bucketName, ListObjectOptions.fromMaxKeys(100))
case _ => ZIO.dieMessage("Empty token is invalid")
override def putObject[R](
bucketName: String,
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
): ZIO[R, S3Exception, Unit] =
(for {
_ <- refDb.update(db =>
db + (bucketName + key -> (options.contentType
.getOrElse("application/octet-stream") -> options.metadata))
)
filePath = path / bucketName / key
_ <- filePath.parent
.map(parentPath => Files.createDirectories(parentPath))
.getOrElse(ZIO.unit)

_ <- ZIO.scoped[R](
AsynchronousFileChannel
.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
)
.flatMap(channel =>
content
.mapChunks(Chunk.succeed)
.runFoldZIO(0L) { case (pos, c) => channel.writeChunk(c, pos).as(pos + c.length) }
)
)
} yield ()).orDie

override def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] =
ZIO.dieMessage("Not implemented error - please don't call execute() S3 Test mode")

override def multipartUpload[R](
bucketName: String,
key: String,
content: ZStream[R, Throwable, Byte],
options: MultipartUploadOptions
)(parallelism: Int): ZIO[R, S3Exception, Unit] = {
val _contentType = options.uploadOptions.contentType.orElse(Some("binary/octet-stream"))

for {
_ <- ZIO.dieMessage(s"parallelism must be > 0. $parallelism is invalid").unless(parallelism > 0)
_ <-
ZIO
.dieMessage(
s"Invalid part size ${Math.floor(options.partSize.toDouble / PartSize.Mega.toDouble * 100d) / 100d} Mb, minimum size is ${PartSize.Min / PartSize.Mega} Mb"
)
.unless(options.partSize >= PartSize.Min)
_ <- putObject(
bucketName,
key,
0,
content.rechunk(options.partSize),
options.uploadOptions.copy(contentType = _contentType)
)
} yield ()
}

override def putObject[R](
bucketName: String,
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
): ZIO[R, S3Exception, Unit] =
(for {
_ <-
refDb.update(db =>
db + (bucketName + key -> (options.contentType.getOrElse("application/octet-stream") -> options.metadata))
)
filePath = path / bucketName / key
_ <- filePath.parent
.map(parentPath => Files.createDirectories(parentPath))
.getOrElse(ZIO.unit)

_ <- ZIO.scoped[R](
AsynchronousFileChannel
.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
)
.flatMap(channel =>
content
.mapChunks(Chunk.succeed)
.runFoldZIO(0L) { case (pos, c) => channel.writeChunk(c, pos).as(pos + c.length) }
)
)
} yield ()).orDie

override def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] =
ZIO.dieMessage("Not implemented error - please don't call execute() S3 Test mode")

override def multipartUpload[R](
bucketName: String,
key: String,
content: ZStream[R, Throwable, Byte],
options: MultipartUploadOptions
)(parallelism: Int): ZIO[R, S3Exception, Unit] = {
val _contentType = options.uploadOptions.contentType.orElse(Some("binary/octet-stream"))

for {
_ <- ZIO.dieMessage(s"parallelism must be > 0. $parallelism is invalid").unless(parallelism > 0)
_ <-
ZIO
.dieMessage(
s"Invalid part size ${Math.floor(options.partSize.toDouble / PartSize.Mega.toDouble * 100d) / 100d} Mb, minimum size is ${PartSize.Min / PartSize.Mega} Mb"
)
.unless(options.partSize >= PartSize.Min)
_ <- putObject(
bucketName,
key,
0,
content.rechunk(options.partSize),
options.uploadOptions.copy(contentType = _contentType)
)
} yield ()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/zio/s3/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ package object s3 {
)

def stub(path: ZPath): ZLayer[Any, Nothing, S3] =
ZLayer.succeed(Test.connect(path))
ZLayer.fromZIO(Test.connect(path))

def listAllObjects(bucketName: String): S3Stream[S3ObjectSummary] =
ZStream.serviceWithStream[S3](_.listAllObjects(bucketName))
Expand Down

0 comments on commit 79b7524

Please sign in to comment.