diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index ed6f85e6..f0422a6b 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -24,3 +24,6 @@ a835b7008a8cd6bfe97706dd8823593ae6bc99f7 # Scala Steward: Reformat with scalafmt 3.8.4 bc673ded2f2fe337f901f43c41e07c29be15676f + +# Scala Steward: Reformat with scalafmt 3.9.10 +6e7eaf18653615e19ca7cb83cab17ac6e049c71b diff --git a/.github/release-drafter.yml b/.github/release-drafter.yml index 0a5d792c..f1280823 100644 --- a/.github/release-drafter.yml +++ b/.github/release-drafter.yml @@ -1,4 +1,4 @@ template: | - ## What’s Changed + ## What's Changed $CHANGES \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01cbb280..b35fbf32 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,9 @@ on: push: branches: ['**'] tags: [v*] +permissions: + contents: write # release-drafter, auto-merge requirement + pull-requests: write # labeler, auto-merge requirement jobs: ci-2-12: # run on 1) push, 2) external PRs, 3) softwaremill-ci PRs @@ -21,7 +24,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -63,7 +66,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -105,7 +108,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -147,7 +150,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -216,7 +219,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -256,7 +259,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -300,7 +303,7 @@ jobs: - name: Publish release notes id: create_release - uses: release-drafter/release-drafter@v5 + uses: release-drafter/release-drafter@b1476f6e6eb133afa41ed8589daba6dc69b4d3f5 # v6, specifically v6.1.0 with: config-name: release-drafter.yml publish: true @@ -329,7 +332,7 @@ jobs: uses: actions/checkout@v2 - name: Set up SBT - uses: sbt/setup-sbt@v1 + uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1, specifically v1.1.14 - name: Set up JDK uses: actions/setup-java@v4 @@ -351,7 +354,7 @@ jobs: key: ${{ runner.os }}-sbt-release-${{ hashFiles('**/build.sbt') }} - name: Login to DockerHub - uses: docker/login-action@v1 + uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3, specifically v3.6.0 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} @@ -362,40 +365,15 @@ jobs: - name: Publish JVM native docker image run: sbt "project nativeServer; assembly; Docker / publish" - # `automerge` label is attached iff there is exactly one file changed by steward and this file belongs to a - # whitelist specified by `labeler.yml` label: - name: Attach automerge label # only for PRs by softwaremill-ci if: github.event.pull_request.user.login == 'softwaremill-ci' - runs-on: ubuntu-24.04 - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 2 - # count number of files changed - - name: Count number of files changed - id: count-changed-files - run: | - N=$(git diff --name-only -r HEAD^1 HEAD | wc -w) - echo "changed_files_num=$N" >> $GITHUB_OUTPUT - - name: Launch labeler - # skip if more than one file changed - if: steps.count-changed-files.outputs.changed_files_num == 1 - uses: srvaroa/labeler@master - env: - GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" + uses: softwaremill/github-actions-workflows/.github/workflows/label.yml@main + secrets: inherit auto-merge: - name: Auto merge # only for PRs by softwaremill-ci if: github.event.pull_request.user.login == 'softwaremill-ci' needs: [ci-2-12, ci-2-13, ci-3, ci-docker, ci-native-image-agent-config-verification, label] - runs-on: ubuntu-24.04 - steps: - - id: automerge - name: automerge - uses: "pascalgn/automerge-action@v0.15.6" - env: - GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" - MERGE_METHOD: "squash" + uses: softwaremill/github-actions-workflows/.github/workflows/auto-merge.yml@main + secrets: inherit \ No newline at end of file diff --git a/.github/workflows/scala-steward.yml b/.github/workflows/scala-steward.yml index 1e062285..8ed2e866 100644 --- a/.github/workflows/scala-steward.yml +++ b/.github/workflows/scala-steward.yml @@ -6,23 +6,14 @@ on: - cron: '0 0 * * *' workflow_dispatch: +permissions: + contents: write # Required to checkout and push changes + pull-requests: write # Required to create PRs for dependency updates + jobs: scala-steward: - runs-on: ubuntu-24.04 - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v4 - with: - distribution: 'temurin' - java-version: '11' - cache: 'sbt' - - name: Launch Scala Steward - uses: scala-steward-org/scala-steward-action@v2 - with: - author-name: scala-steward - author-email: scala-steward - github-token: ${{ secrets.REPO_GITHUB_TOKEN }} - repo-config: .scala-steward.conf - ignore-opts-files: false + uses: softwaremill/github-actions-workflows/.github/workflows/scala-steward.yml@main + secrets: + github-token: ${{ secrets.SOFTWAREMILL_CI_PR_TOKEN }} + with: + java-version: '21' diff --git a/.gitignore b/.gitignore index 451f833a..ea16afb8 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,6 @@ project/boot .env .metals -.bloop \ No newline at end of file +.bloop +.vscode +.cursor \ No newline at end of file diff --git a/.scala-steward.conf b/.scala-steward.conf index ad9ce232..f15418de 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -6,6 +6,7 @@ updates.ignore = [ {groupId = "org.scala-lang", artifactId = "scala-compiler", version = "3."} ] updates.pin = [ + {groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.3."}, {groupId = "org.scala-lang", artifactId = "scala3-library", version = "3.3."}, {groupId = "org.scala-lang", artifactId = "scala3-library_sjs1", version = "3.3."} -] \ No newline at end of file +] diff --git a/.scalafmt.conf b/.scalafmt.conf index 33b6a2de..d68f53a5 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,3 +1,3 @@ -version=3.9.4 +version=3.10.7 maxColumn = 120 runner.dialect = scala213 diff --git a/README.md b/README.md index decd0a63..1f8cd09f 100644 --- a/README.md +++ b/README.md @@ -450,7 +450,7 @@ In case of problems with file mounting on Windows place the `application.ini` an Another option is to use custom `Dockerfile`: ``` -FROM openjdk:8-jre-alpine +FROM eclipse-temurin:11-jdk-noble ARG ELASTICMQ_VERSION ENV ELASTICMQ_VERSION ${ELASTICMQ_VERSION} @@ -546,7 +546,7 @@ Publishing Docker image for two different platforms: `amd64` and `arm64` is poss Docker Buildx is included in Docker Desktop and Docker Linux packages when installed using the DEB or RPM packages. `build.sbt` has following setup: * `dockerBuildxSettings` creates Docker Buildx instance -* Docker base image is `openjdk:11-jdk-stretch` which supports multi-arch images +* Docker base image is `eclipse-temurin:11-jdk-noble` which supports multi-arch images * `dockerBuildCommand` is extended with operator `buildx` * `dockerBuildOptions` has two additional parameters: `--platform=linux/arm64,linux/amd64` and `--push` diff --git a/build.sbt b/build.sbt index 35fe1eee..db3881ac 100644 --- a/build.sbt +++ b/build.sbt @@ -9,8 +9,8 @@ import scoverage.ScoverageKeys.* import scala.sys.process.Process val v2_12 = "2.12.20" -val v2_13 = "2.13.16" -val v3 = "3.3.6" +val v2_13 = "2.13.18" +val v3 = "3.3.7" lazy val resolvedScalaVersion = sys.env.get("SCALA_MAJOR_VERSION") match { @@ -27,12 +27,12 @@ lazy val yarnTask = inputKey[Unit]("Run yarn with arguments") lazy val ensureDockerBuildx = taskKey[Unit]("Ensure that docker buildx configuration exists") lazy val dockerBuildWithBuildx = taskKey[Unit]("Build docker images using buildx") -val config = "com.typesafe" % "config" % "1.4.4" +val config = "com.typesafe" % "config" % "1.4.5" val pureConfig = "com.github.pureconfig" %% "pureconfig-core" % "0.17.8" val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "2.4.0" val scalalogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.6" -val logback = "ch.qos.logback" % "logback-classic" % "1.3.15" +val logback = "ch.qos.logback" % "logback-classic" % "1.3.16" val jclOverSlf4j = "org.slf4j" % "jcl-over-slf4j" % "2.0.17" // needed form amazon java sdk val scalatest = "org.scalatest" %% "scalatest" % "3.2.19" @@ -41,8 +41,8 @@ val awaitility = "org.awaitility" % "awaitility-scala" % "4.3.0" val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.699" exclude ("commons-logging", "commons-logging") val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.25.60" -val pekkoVersion = "1.2.0" -val pekkoHttpVersion = "1.2.0" +val pekkoVersion = "1.4.0" +val pekkoHttpVersion = "1.3.0" val pekkoActor = "org.apache.pekko" %% "pekko-actor" % pekkoVersion val pekkoSlf4j = "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion val pekkoStreams = "org.apache.pekko" %% "pekko-stream" % pekkoVersion @@ -254,7 +254,7 @@ lazy val server: Project = (project in file("server")) }, // docker dockerExposedPorts := Seq(9324, 9325), - dockerBaseImage := "openjdk:11-jdk-stretch", + dockerBaseImage := "eclipse-temurin:11-jdk-noble", Docker / packageName := "elasticmq", dockerUsername := Some("softwaremill"), dockerUpdateLatest := { diff --git a/core/src/main/scala/org/elasticmq/Limits.scala b/core/src/main/scala/org/elasticmq/Limits.scala index 9ddd68f2..eaaf3a34 100644 --- a/core/src/main/scala/org/elasticmq/Limits.scala +++ b/core/src/main/scala/org/elasticmq/Limits.scala @@ -35,7 +35,7 @@ case object StrictSQSLimits extends Limits { RangeLimit(-BigDecimal.valueOf(10).pow(128), BigDecimal.valueOf(10).pow(126)) ) override val messageWaitTimeLimit: Limit[RangeLimit[Long]] = LimitedValue(RangeLimit(0L, 20L)) - override val maxMessageLength: Limit[Int] = LimitedValue(262144) + override val maxMessageLength: Limit[Int] = LimitedValue(1024 * 1024) } case object RelaxedSQSLimits extends Limits { override val queueNameLengthLimit: Limit[Int] = NoLimit diff --git a/core/src/main/scala/org/elasticmq/MessageData.scala b/core/src/main/scala/org/elasticmq/MessageData.scala index ac90b1e6..ab05ebff 100644 --- a/core/src/main/scala/org/elasticmq/MessageData.scala +++ b/core/src/main/scala/org/elasticmq/MessageData.scala @@ -2,10 +2,17 @@ package org.elasticmq import java.time.OffsetDateTime +case class MessageContent(value: String) { + override def toString: String = { + val limit = 50 + if (value.length > limit) value.take(limit) + "..." else value + } +} + case class MessageData( id: MessageId, deliveryReceipt: Option[DeliveryReceipt], - content: String, + content: MessageContent, messageAttributes: Map[String, MessageAttribute], nextDelivery: MillisNextDelivery, created: OffsetDateTime, diff --git a/core/src/main/scala/org/elasticmq/NewMessageData.scala b/core/src/main/scala/org/elasticmq/NewMessageData.scala index 8c04effe..97148f0e 100644 --- a/core/src/main/scala/org/elasticmq/NewMessageData.scala +++ b/core/src/main/scala/org/elasticmq/NewMessageData.scala @@ -2,7 +2,7 @@ package org.elasticmq case class NewMessageData( id: Option[MessageId], - content: String, + content: MessageContent, messageAttributes: Map[String, MessageAttribute], nextDelivery: NextDelivery, messageGroupId: Option[String], diff --git a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala index 9b4f799f..09ef142e 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/InternalMessage.scala @@ -11,7 +11,7 @@ case class InternalMessage( id: String, deliveryReceipts: mutable.Buffer[String], var nextDelivery: Long, - content: String, + content: MessageContent, messageAttributes: Map[String, MessageAttribute], created: OffsetDateTime, orderIndex: Int, diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 31e313b3..28fa465e 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -33,7 +33,7 @@ trait QueueActorMessageOps receiveMessages(visibilityTimeout, count, receiveRequestAttemptId).send() case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt).send() - case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData) + case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData) case MoveMessage(message, destination, sourceQueueName) => moveMessage(message, destination, sourceQueueName).send() case DeduplicationIdsCleanup => diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala index 140d693c..ccbfec4b 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorQueueOps.scala @@ -10,7 +10,7 @@ trait QueueActorQueueOps extends Logging { def receiveAndReplyQueueMsg[T](msg: QueueQueueMsg[T]): ReplyAction[T] = msg match { - case GetQueueData() => queueData + case GetQueueData() => queueData case UpdateQueueDefaultVisibilityTimeout(newDefaultVisibilityTimeout) => logger.info(s"${queueData.name}: Updating default visibility timeout to $newDefaultVisibilityTimeout") queueData = queueData.copy(defaultVisibilityTimeout = newDefaultVisibilityTimeout) @@ -40,7 +40,7 @@ trait QueueActorQueueOps extends Logging { messageQueue.clear() fifoMessagesHistory = FifoDeduplicationIdsHistory.newHistory() case GetQueueStatistics(deliveryTime) => getQueueStatistics(deliveryTime) - case UpdateQueueTags(newQueueTags) => + case UpdateQueueTags(newQueueTags) => logger.info(s"${queueData.name} Adding and Updating tags ${newQueueTags}") queueData = queueData.copy(tags = queueData.tags ++ newQueueTags) case RemoveQueueTags(tagsToRemove) => diff --git a/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala index dddaaaf5..2e498f19 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/operations/MoveMessageOps.scala @@ -22,7 +22,7 @@ trait MoveMessageOps extends Logging { if (queueData.isFifo) { CommonOperations.wasRegistered(messageWithSourceQueueName.toNewMessageData, fifoMessagesHistory) match { case Some(_) => ResultWithEvents.empty - case None => + case None => logger.debug(s"Moved message (${messageWithSourceQueueName.id}) from FIFO queue to ${queueData.name}") moveMessageToQueue(regenerateDeduplicationId(messageWithSourceQueueName)) } diff --git a/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala b/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala index 45e47406..81619463 100644 --- a/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala +++ b/core/src/test/scala/org/elasticmq/FifoDeduplicationIdsHistoryTest.scala @@ -105,7 +105,7 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers { id = "1", deliveryReceipts = mutable.Buffer.empty, nextDelivery = 100L, - content = "", + content = MessageContent(""), messageAttributes = Map.empty, created = created, orderIndex = 0, diff --git a/core/src/test/scala/org/elasticmq/LimitsTest.scala b/core/src/test/scala/org/elasticmq/LimitsTest.scala index 7f14914a..977d1ef4 100644 --- a/core/src/test/scala/org/elasticmq/LimitsTest.scala +++ b/core/src/test/scala/org/elasticmq/LimitsTest.scala @@ -206,15 +206,15 @@ class LimitsTest extends AnyWordSpec with Matchers with EitherValues { } "Validation of message length in strict mode" should { - "pass if the length is smaller than the limit (262144)" in { + "pass if the length is equal or smaller than the limit (1024 * 1024)" in { Limits.verifyMessageLength(-5, StrictSQSLimits) shouldBe Right(()) Limits.verifyMessageLength(0, StrictSQSLimits) shouldBe Right(()) Limits.verifyMessageLength(100, StrictSQSLimits) shouldBe Right(()) - Limits.verifyMessageLength(262144, StrictSQSLimits) shouldBe Right(()) + Limits.verifyMessageLength(1024 * 1024, StrictSQSLimits) shouldBe Right(()) } "fail if the length is bigger than the limit" in { - val error = Limits.verifyMessageLength(300000, StrictSQSLimits).left.value + val error = Limits.verifyMessageLength(1024 * 1024 + 1, StrictSQSLimits).left.value error shouldBe "MessageTooLong" } } diff --git a/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala b/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala index 13e55d32..ef23b5c8 100644 --- a/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala +++ b/core/src/test/scala/org/elasticmq/actor/queue/InternalMessageSpec.scala @@ -1,6 +1,6 @@ package org.elasticmq.actor.queue -import org.elasticmq.NeverReceived +import org.elasticmq.{MessageContent, NeverReceived} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -16,7 +16,7 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { id = "id", deliveryReceipts = mutable.Buffer.empty, nextDelivery = 0L, - content = "content", + content = MessageContent("content"), messageAttributes = Map.empty, created = freezedDateTime, orderIndex = 100, @@ -44,7 +44,7 @@ class InternalMessageSpec extends AnyFunSuite with Matchers { id = "id", deliveryReceipts = mutable.Buffer.empty, nextDelivery = 0L, - content = "content", + content = MessageContent("content"), messageAttributes = Map.empty, created = freezedDateTime, orderIndex = 100, diff --git a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala index f084f6cc..05c4477f 100644 --- a/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/queue/ReceiveRequestAttemptCacheTest.scala @@ -1,7 +1,7 @@ package org.elasticmq.actor.queue import scala.collection.mutable -import org.elasticmq.NeverReceived +import org.elasticmq.{MessageContent, NeverReceived} import org.elasticmq.actor.queue.ReceiveRequestAttemptCache.ReceiveFailure.Invalid import org.elasticmq.util.MutableNowProvider import org.scalatest.funsuite.AnyFunSuite @@ -21,7 +21,7 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { "id-1", mutable.Buffer.empty, 1L, - "content", + MessageContent("content"), Map.empty, nowProvider.now, orderIndex = 0, @@ -72,7 +72,7 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers { "id-1", mutable.Buffer.empty, 1L, - "content", + MessageContent("content"), Map.empty, nowProvider.now, orderIndex = 0, diff --git a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala index 8473b490..9464dbf1 100644 --- a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala +++ b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala @@ -44,7 +44,7 @@ trait DataCreationHelpers { MessageData( MessageId(id), deliveryReceipt, - content, + MessageContent(content), messageAttributes, nextDelivery, OffsetDateTimeUtil.ofEpochMilli(0), @@ -67,7 +67,7 @@ trait DataCreationHelpers { ) = NewMessageData( Some(MessageId(id)), - content, + MessageContent(content), messageAttributes, nextDelivery, messageGroupId, diff --git a/performance-tests/src/test/scala/org/elasticmq/performance/LocalPerformanceTest.scala b/performance-tests/src/test/scala/org/elasticmq/performance/LocalPerformanceTest.scala index 8dc3c320..5dcdd367 100644 --- a/performance-tests/src/test/scala/org/elasticmq/performance/LocalPerformanceTest.scala +++ b/performance-tests/src/test/scala/org/elasticmq/performance/LocalPerformanceTest.scala @@ -127,17 +127,17 @@ object LocalPerformanceTest extends App { def sendMessage(m: String): Unit = { Await.result( currentQueue ? SendMessage( - NewMessageData(None, m, Map.empty, ImmediateNextDelivery, None, None, 0, None, None) + NewMessageData(None, MessageContent(m), Map.empty, ImmediateNextDelivery, None, None, 0, None, None, None) ), 10.seconds ) } - def receiveMessage() = { + def receiveMessage(): String = { val messages = Await.result(currentQueue ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None), 10.seconds) val message = messages.head Await.result(currentQueue ? DeleteMessage(message.deliveryReceipt.get), 10.seconds) - message.content + message.content.value } } diff --git a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala index 24021d47..367ed5d5 100644 --- a/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala +++ b/persistence/persistence-file/src/main/scala/org/elasticmq/persistence/file/QueueConfigUtil.scala @@ -25,7 +25,7 @@ object QueueConfigUtil extends Logging { .toMap ) match { case Success(value) => getQueuesFromConfig(value) - case Failure(ex) => { + case Failure(ex) => { logger.error("Failed to extract queue configuration", ex) throw new IllegalStateException(ex) } @@ -35,7 +35,7 @@ object QueueConfigUtil extends Logging { private def getQueuesFromConfig(queuesConfig: Map[String, ConfigValue]): List[CreateQueueMetadata] = { Try(getQueuesFromConfigUnsafe(queuesConfig)) match { case Success(value) => value - case Failure(ex) => { + case Failure(ex) => { logger.error("Failed to create queues from config", ex) throw new IllegalStateException(ex) } diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala index d9b4365b..76b4481c 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/DBMessage.scala @@ -47,7 +47,7 @@ case class DBMessage( messageId, serializedDeliveryReceipts.toBuffer, nextDelivery, - new String(content), + MessageContent(new String(content)), serializedAttrs, OffsetDateTimeUtil.ofEpochMilli(created), orderIndex = 0, @@ -105,7 +105,7 @@ object DBMessage { message.id, deliveryReceipts.toJson.toString.getBytes, message.nextDelivery, - message.content.getBytes, + message.content.value.getBytes, attributes.toJson.toString.getBytes, message.created.toInstant.toEpochMilli, received, diff --git a/project/build.properties b/project/build.properties index 5e6884d3..4d6c5670 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.11.6 +sbt.version=1.12.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 22638211..fcaa11ed 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,13 +1,13 @@ // required for sbt-github-release resolution resolvers += "Era7 maven releases" at "https://s3-eu-west-1.amazonaws.com/releases.era7.com" -val sbtSoftwaremillVersion = "2.1.0" +val sbtSoftwaremillVersion = "2.1.1" addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-common" % sbtSoftwaremillVersion) addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-publish" % sbtSoftwaremillVersion) -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.11.3") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.11.7") addSbtPlugin("com.github.sbt" % "sbt-git" % "2.1.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1") -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.3.1") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.4.4") libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.601" diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index 40d8fc1d..b53880c0 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -1534,7 +1534,7 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers val queueUrl = cli.createQueue(new CreateQueueRequest("testQueue1")).getQueueUrl // When - cli.sendMessage(new SendMessageRequest(queueUrl, "x" * 262145)) + cli.sendMessage(new SendMessageRequest(queueUrl, "x" * (1024 * 1024 + 1))) } } @@ -1546,8 +1546,8 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers // When cli.sendMessageBatch( new SendMessageBatchRequest(queueUrl).withEntries( - new SendMessageBatchRequestEntry("1", "x" * 140000), - new SendMessageBatchRequestEntry("2", "x" * 140000) + new SendMessageBatchRequestEntry("1", "x" * (1024 * 1024)), + new SendMessageBatchRequestEntry("2", "x") ) ) } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqlQueuePersistenceTest.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqlQueuePersistenceTest.scala index 3e1860dd..4e2316a6 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqlQueuePersistenceTest.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/SqlQueuePersistenceTest.scala @@ -40,7 +40,7 @@ class SqlQueuePersistenceTest client.sendMessage(new SendMessageRequest(queueUrl, "Message 3")) val storedMessages = (store ? GetAllMessages("testQueue1")).futureValue - storedMessages.map(_.content).toSet shouldBe Set("Message 1", "Message 2", "Message 3") + storedMessages.map(_.content.value).toSet shouldBe Set("Message 1", "Message 2", "Message 3") } startServerAndRun(pruneDataOnInit = false) { diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala index 6bbccb9a..51fe70e4 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala @@ -134,6 +134,17 @@ abstract class AmazonJavaSdkNewTestSuite message.attributes shouldBe Map(AWSTraceHeader -> "abc-123") } + test("should send and receive 1MB message") { + doTestSendAndReceiveMessageWithAttributes("x" * 1024 * 1024) + } + + test("should fail send 1MB + 1 byte message") { + val queue = testClient.createQueue("testQueue1") + val content = "x" * (1024 * 1024 + 1) + val sendResult = testClient.sendMessage(queue, content) + sendResult.isLeft shouldBe true // TODO: check error type returned by SQS + } + test("should return DeadLetterQueueSourceArn in receive message attributes") { // given val dlQueue = testClient.createQueue("testDlq") @@ -421,7 +432,9 @@ abstract class AmazonJavaSdkNewTestSuite ) = { // given val queue = testClient.createQueue("testQueue1") - testClient.sendMessage(queue, content, messageAttributes = messageAttributes, awsTraceHeader = awsTraceHeader) + val sendResult = + testClient.sendMessage(queue, content, messageAttributes = messageAttributes, awsTraceHeader = awsTraceHeader) + sendResult shouldBe Right(()) val message = receiveSingleMessageObject(queue, requestedAttributes, requestedSystemAttributes).orNull // then diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala index 6b8a82f7..aee8d133 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/AttributesModule.scala @@ -109,7 +109,7 @@ trait AttributesModule { def read(parameters: Map[String, String]): Map[String, String] = { def collect(suffix: Int, acc: Map[String, String]): Map[String, String] = { parameters.get("Attribute." + suffix + ".Name") match { - case None => acc + case None => acc case Some(an) => collect(suffix + 1, acc + (an -> parameters("Attribute." + suffix + ".Value"))) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala index 9d16ab01..5653712d 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CancelMessageMoveTaskDirectives.scala @@ -23,7 +23,7 @@ trait CancelMessageMoveTaskDirectives { this: ElasticMQDirectives with QueueURLM await( queueManagerActor ? CancelMessageMoveTask(params.TaskHandle) ) match { - case Left(e: ElasticMQError) => throw e.toSQSException + case Left(e: ElasticMQError) => throw e.toSQSException case Right(approximateNumberOfMessagesMoved) => complete(CancelMessageMoveTaskResponse(approximateNumberOfMessagesMoved)) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala index b74b28d7..a5a89054 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ChangeMessageVisibilityDirectives.scala @@ -23,7 +23,7 @@ trait ChangeMessageVisibilityDirectives { this: ElasticMQDirectives with Respons ) result.map { case Left(error) => throw error.toSQSException - case Right(_) => + case Right(_) => emptyResponse("ChangeMessageVisibilityResponse") } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteMessageBatchDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteMessageBatchDirectives.scala index cf7d2a5c..9e5b2cdf 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteMessageBatchDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/DeleteMessageBatchDirectives.scala @@ -27,7 +27,7 @@ trait DeleteMessageBatchDirectives { val result = queueActor ? DeleteMessage(DeliveryReceipt(receiptHandle)) result.flatMap { - case Right(_) => Future.successful(BatchDeleteMessageResponseEntry(id)) + case Right(_) => Future.successful(BatchDeleteMessageResponseEntry(id)) case Left(invalidHandle) => Future.failed(invalidHandle.toSQSException) } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index 4d91b20b..38b50e41 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -142,8 +142,8 @@ trait ReceiveMessageDirectives { ReceivedMessage( Attributes = if (calculatedAttributes.nonEmpty) Some(calculatedAttributes) else None, - Body = message.content, - MD5OfBody = md5Digest(message.content), + Body = message.content.value, + MD5OfBody = md5Digest(message.content.value), MD5OfMessageAttributes = if (filteredMessageAttributes.nonEmpty) Some(md5AttributeDigest(filteredMessageAttributes)) else None, MessageAttributes = if (filteredMessageAttributes.nonEmpty) Some(filteredMessageAttributes) else None, diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ResponseMarshaller.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ResponseMarshaller.scala index d7863ffb..2328c1c1 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ResponseMarshaller.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ResponseMarshaller.scala @@ -25,7 +25,7 @@ trait ResponseMarshaller { this: RespondDirectives => ): Marshaller[T, RequestEntity] = marshallerDependencies.protocol match { case AWSProtocol.`AWSJsonProtocol1.0` => sprayJsonMarshaller[T] - case _ => + case _ => namespace { ns => Marshaller.withFixedContentType[T, RequestEntity](`text/xml(UTF-8)`) { t => val xml = xmlSerializer.toXml(t) % ns diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala index 06ef2ce5..5159451a 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SendMessageDirectives.scala @@ -145,7 +145,7 @@ trait SendMessageDirectives { .get(AwsTraceHeaderSystemAttribute) .map { case StringMessageAttribute(value, _) => TracingId(value) - case NumberMessageAttribute(_, _) => + case NumberMessageAttribute(_, _) => throw SQSException.invalidParameter( s"$AwsTraceHeaderSystemAttribute should be declared as a String, instead it was recognized as a Number" ) @@ -158,7 +158,7 @@ trait SendMessageDirectives { NewMessageData( None, - body, + MessageContent(body), messageAttributes, nextDelivery, messageGroupId, @@ -174,7 +174,7 @@ trait SendMessageDirectives { queueActor: ActorRef, message: NewMessageData ): Future[MessageSendOutcome] = { - val digest = md5Digest(message.content) + val digest = md5Digest(message.content.value) val messageAttributeDigest = if (message.messageAttributes.isEmpty) { None diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/TagsModule.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/TagsModule.scala index d4f20a24..bb8c4f51 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/TagsModule.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/TagsModule.scala @@ -14,7 +14,7 @@ trait TagsModule { } def collect(suffix: Int, acc: Map[String, String]): Map[String, String] = { parameters.get(tagPrefix + suffix + ".Key") match { - case None => acc + case None => acc case Some(an) => parameters.get(tagPrefix + suffix + ".Value") match { case None => collect(suffix + 1, acc + (an -> "")) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ExceptionDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ExceptionDirectives.scala index a256b2a2..402ec144 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ExceptionDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/ExceptionDirectives.scala @@ -2,13 +2,16 @@ package org.elasticmq.rest.sqs.directives import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.http.scaladsl.model.headers.{ModeledCustomHeader, ModeledCustomHeaderCompanion} import org.apache.pekko.http.scaladsl.server.{Directive0, Directives, ExceptionHandler, Route} -import org.elasticmq.rest.sqs.{AWSProtocol, SQSException} import org.elasticmq.rest.sqs.Constants._ +import org.elasticmq.rest.sqs.{AWSProtocol, SQSException} import org.elasticmq.util.Logging import spray.json.DefaultJsonProtocol._ import spray.json.RootJsonFormat +import scala.util.{Success, Try} + trait ExceptionDirectives extends Logging { this: Directives with RespondDirectives => @@ -18,7 +21,10 @@ trait ExceptionDirectives extends Logging { respondWith(e.httpStatusCode) { e.toXml(EmptyRequestId) } - case _ => complete(e.httpStatusCode, ErrorResponse(e.errorType, e.message)) + case _ => + respondWithHeader(`X-Amzn-Query-Error`(e.errorType)) { + complete(e.httpStatusCode, ErrorResponse(e.errorType, e.message)) + } } } @@ -39,4 +45,16 @@ trait ExceptionDirectives extends Logging { implicit val format: RootJsonFormat[ErrorResponse] = jsonFormat2(ErrorResponse.apply) } + final class `X-Amzn-Query-Error`(val value: String) extends ModeledCustomHeader[`X-Amzn-Query-Error`] { + override def renderInRequests: Boolean = false + override def renderInResponses: Boolean = true + override def companion: ModeledCustomHeaderCompanion[`X-Amzn-Query-Error`] = `X-Amzn-Query-Error` + } + + object `X-Amzn-Query-Error` extends ModeledCustomHeaderCompanion[`X-Amzn-Query-Error`] { + override val name: String = "X-Amzn-Query-Error" + + override def parse(value: String): Try[`X-Amzn-Query-Error`] = + Success(new `X-Amzn-Query-Error`(value)) + } } diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/QueueDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/QueueDirectives.scala index ffa07bc4..760c295a 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/QueueDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/QueueDirectives.scala @@ -49,10 +49,10 @@ trait QueueDirectives { defaultMatcher(Uri(queueUrl).path) match { case Matched(_, (_, queueName)) => provide(queueName): Directive1[String] - case Unmatched => + case Unmatched => noAccountIdMatcher(Uri(queueUrl).path) match { case Matched(_, Tuple1(queueName)) => provide(queueName) - case Unmatched => + case Unmatched => reject( MalformedQueryParamRejection( QueueUrlParameter, diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/stats/StatisticsDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/stats/StatisticsDirectives.scala index bfc63340..249d8bf6 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/stats/StatisticsDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/stats/StatisticsDirectives.scala @@ -81,7 +81,7 @@ trait StatisticsDirectives extends StatisticsJsonFormat { queueActorAndDataFromQueueName(queueName) { (queueActor, queueData) => onComplete(getQueryResponseWithAttributesFuture(queueName, queueActor, queueData)) { case Success(value) => complete(value) - case Failure(ex) => + case Failure(ex) => logger.error(s"Error while loading statistics for queue ${queueName}", ex) complete(NotFound, s"Can't load data for queue ${queueName}") }