Commit 9571ad6a authored by Andreas Muttscheller's avatar Andreas Muttscheller

Add IntegrationTest

parent b332fb28
......@@ -3,7 +3,8 @@
# sbt
# (may want to keep parts of 'project')
bin/
project/
project/project
project/target
target/
build/
......
......@@ -5,6 +5,11 @@ ThisBuild / scalacOptions := Seq(
"-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation",
"-feature", "-unchecked", "-language:implicitConversions", "-language:postfixOps")
ThisBuild / javacOptions ++= Seq("-encoding", "UTF-8")
ThisBuild / javaOptions ++= Seq("-Dfile.encoding=UTF-8")
addCommandAlias("cloudItTest", "; it:awsCleanup; it:test; it:awsCleanup")
lazy val root = (project in file("."))
.configs(IntegrationTest)
.settings(
......@@ -18,6 +23,13 @@ lazy val root = (project in file("."))
assemblyJarName in assembly := "planned-timetable-fetcher.jar",
fork in IntegrationTest := true,
testOptions in IntegrationTest += Tests.Argument("-oDF"),
envVars in IntegrationTest := Map(
"SERVERLESS_SERVICE" -> (awsServiceName in IntegrationTest).value,
"SERVERLESS_STAGE" -> (awsStage in IntegrationTest).value,
),
libraryDependencies ++= Seq(
// Common libraries for logging and config
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
......@@ -35,6 +47,7 @@ lazy val root = (project in file("."))
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-elasticsearch" % "1.11.488",
"com.amazonaws" % "aws-lambda-java-events" % "2.2.5",
"com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
......
import java.util.function.Consumer
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.elasticsearch.AWSElasticsearchClientBuilder
import com.amazonaws.services.elasticsearch.model.DescribeElasticsearchDomainRequest
import com.sksamuel.elastic4s.aws.Aws4ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.ElasticProperties
import sbt._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
object AwsUtils extends AutoPlugin {
override def requires = sbt.plugins.JvmPlugin
override def trigger = allRequirements
object autoImport {
val awsServiceName = taskKey[String]("Service name of the application")
val awsStage = taskKey[String]("Stage of the application")
val awsCleanup = taskKey[Unit]("Cleanup AWS resources like DynamoDB or ElasticSearch")
}
import autoImport._
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
def clearDynamoDBTable(tableName: String)(serviceName: String, stage: String): Future[String] = {
Future {
val fullTableName = s"$serviceName-$stage-$tableName"
println(s"Clearing table $fullTableName")
val table: Table = ddb.getTable(fullTableName)
val tableDescription = table.describe()
val createTableRequest = new CreateTableRequest()
.withTableName(fullTableName)
.withAttributeDefinitions(tableDescription.getAttributeDefinitions)
.withKeySchema(tableDescription.getKeySchema)
.withBillingMode(BillingMode.PAY_PER_REQUEST)
if (tableDescription.getGlobalSecondaryIndexes != null) {
val gsi = new java.util.ArrayList[GlobalSecondaryIndex]()
tableDescription.getGlobalSecondaryIndexes.forEach(new Consumer[GlobalSecondaryIndexDescription] {
override def accept(t: GlobalSecondaryIndexDescription): Unit = {
gsi.add(new GlobalSecondaryIndex()
.withIndexName(t.getIndexName)
.withKeySchema(t.getKeySchema)
.withProjection(t.getProjection)
)
}
})
createTableRequest.setGlobalSecondaryIndexes(gsi)
}
table.delete()
table.waitForDelete()
val newTable = ddb.createTable(createTableRequest)
newTable.waitForActive()
fullTableName
}
}
def clearElasticSearchIndex(indexes: String*)(serviceName: String, stage: String): Future[Unit] = {
Future {
val awsEsClient = AWSElasticsearchClientBuilder.standard().build()
val d = awsEsClient.describeElasticsearchDomain(new DescribeElasticsearchDomainRequest().withDomainName(
s"$serviceName-$stage-es"
)
)
println(s"Clearing ElasticSearch Endpoint: ${d.getDomainStatus.getEndpoint}")
val esEndpointUrl = s"https://${d.getDomainStatus.getEndpoint}:80"
val esProperties = ElasticProperties(esEndpointUrl).endpoints.head
val esEndpoint = s"${esProperties.protocol}://${esProperties.host}:${esProperties.port}"
val esClient = Aws4ElasticClient(esEndpoint)
esClient.execute {
deleteIndex(indexes)
}.await
esClient.close()
println(s"Clearing ElasticSearch Endpoint: ${d.getDomainStatus.getEndpoint}...Done")
}
}
override lazy val projectSettings = Seq(
awsServiceName in IntegrationTest := "delay-info",
awsStage in IntegrationTest := "sbt-it",
awsCleanup in IntegrationTest := {
val serviceName = (IntegrationTest / awsServiceName).value
val stage = (IntegrationTest / awsStage).value
println(s"Clearing AWS resources for $serviceName-$stage")
val elasticFuture = clearElasticSearchIndex("stations", "timetables")(serviceName, stage)
val tables = List("Stations", "SlackUsers", "TimetableStops", "PlannedTimetables")
tables.map(t => (t, clearDynamoDBTable(t)(serviceName, stage))).foreach { f =>
Await.ready(f._2, Duration.Inf).value.get match {
case Success(ftn) =>
println(s"Clearing table $ftn...Done")
case Failure(e) =>
println(s"Error clearing table ${f._1}")
e.printStackTrace()
}
}
Await.ready(elasticFuture, Duration.Inf)
}
)
}
\ No newline at end of file
resolvers += Resolver.sonatypeRepo("public")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
\ No newline at end of file
libraryDependencies ++= Seq(
// AWS
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-elasticsearch" % "1.11.488",
// ElasticSearch
"com.sksamuel.elastic4s" % "elastic4s-core_2.12" % "6.3.8",
"com.sksamuel.elastic4s" %% "elastic4s-http" % "6.3.8",
"com.sksamuel.elastic4s" %% "elastic4s-aws" % "6.3.8",
)
\ No newline at end of file
sbt.version = 1.2.7
\ No newline at end of file
addSbtPlugin("io.gatling" % "gatling-sbt" % "3.0.0")
......@@ -69,7 +69,7 @@ functions:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
events:
- sns: ChangedTimetableStops
- sns: ${self:service}-${self:provider.stage}-ChangedTimetableStops
APISlackBotEventHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
iamRoleStatementsInherit: true
......@@ -169,7 +169,7 @@ resources:
Type: "AWS::Elasticsearch::Domain"
Properties:
ElasticsearchVersion: "6.3"
DomainName: "${self:service}-${self:provider.stage}-elasticsearch"
DomainName: "${self:service}-${self:provider.stage}-es"
ElasticsearchClusterConfig:
DedicatedMasterEnabled: false
InstanceCount: "1"
......
package de.codecentric.amuttsch.bahndelayinfo
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import de.codecentric.amuttsch.bahndelayinfo.aws.{StationImporter, getServerlessServiceName}
import de.codecentric.amuttsch.bahndelayinfo.models.Station
import org.json4s.native.JsonMethods.parse
import org.json4s.{CustomSerializer, DefaultFormats, Formats, JNothing, JString}
import org.scalatest.{BeforeAndAfterAll, FunSpec}
import scala.io.Source
class StationImporterTest extends FunSpec with BeforeAndAfterAll {
private val customSerializer = new CustomSerializer[String](_ => (
{ case JString(s) => s },
{ case "" => JNothing case s: String => JString(s) }
))
private implicit val jsonFormats: Formats = DefaultFormats + customSerializer
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable(getServerlessServiceName("Stations"))
override def beforeAll(): Unit = {
StationImporter.main(Array.empty)
}
describe("StationImporter") {
it("should contain all elements from the json") {
val jsonResource = Source.fromResource("dbStations.json")
val jsonString = jsonResource.getLines().mkString("")
val json = parse(jsonString).extract[List[Station]]
var itemCount = 0
tableStations.scan().forEach(_ => itemCount += 1)
assert(json.size == itemCount)
}
}
}
......@@ -34,7 +34,7 @@ object StationImporter extends App {
json.grouped(20).foreach { stationBatch =>
val stationBatchJson = stationBatch.map(s => write(s))
stationBatchJson foreach println
print(s"Importing ${stationBatchJson.head} \r")
val ops = for(s <- stationBatchJson) yield indexInto("stations", "station").doc(s)
esClient.execute {
......@@ -48,6 +48,7 @@ object StationImporter extends App {
ddb.batchWriteItem(twi)
}
println()
println("Done")
esClient.close()
......
......@@ -27,7 +27,8 @@ class ScheduledChangesFetchService {
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsChangedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val snsChangedTimetableInformationTopicArn =
snsClient.createTopic(aws.getServerlessServiceName("ChangedTimetableStops")).getTopicArn
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
......
package de.codecentric.amuttsch.bahndelayinfo
import com.amazonaws.services.dynamodbv2.document.{Item, ItemCollection, ScanOutcome}
import com.amazonaws.services.elasticsearch.AWSElasticsearchClientBuilder
import com.amazonaws.services.elasticsearch.model.DescribeElasticsearchDomainRequest
import com.sksamuel.elastic4s.aws.Aws4ElasticClient
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
......@@ -26,7 +28,14 @@ package object aws {
}
def elasticSearchClient(): ElasticClient = {
val esEndpointUrl = "https://"+scala.util.Properties.envOrElse("ELASTICSEARCH_URL", "localhost").split("/")(0)+":80"
val service = scala.util.Properties.envOrElse("SERVERLESS_SERVICE", "")
val stage = scala.util.Properties.envOrElse("SERVERLESS_STAGE", "")
val awsEsClient = AWSElasticsearchClientBuilder.standard().build()
val d = awsEsClient.describeElasticsearchDomain(new DescribeElasticsearchDomainRequest().withDomainName(
s"$service-$stage-es"
))
val esEndpointUrl = s"https://${d.getDomainStatus.getEndpoint}:80"
val esProperties = ElasticProperties(esEndpointUrl).endpoints.head
val esEndpoint = s"${esProperties.protocol}://${esProperties.host}:${esProperties.port}"
Aws4ElasticClient(esEndpoint)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment