Commit 9107478a authored by Andreas Muttscheller's avatar Andreas Muttscheller

Add caching for station query lambda

parent 1ffabb90
enablePlugins(GatlingPlugin)
ThisBuild / scalaVersion := "2.12.8"
ThisBuild / scalacOptions := Seq(
"-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation",
"-feature", "-unchecked", "-language:implicitConversions", "-language:postfixOps")
lazy val root = (project in file("."))
.configs(IntegrationTest)
.settings(
Defaults.itSettings,
name := "planned-timetable-fetcher",
organization := "de.codecentric.amuttsch",
description := "Fetch planned timetable from DB and insert it into DynamoDB via Kafka.",
licenses += "Apache License, Version 2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0"),
version := "0.1",
scalaVersion := "2.12.8",
assemblyJarName in assembly := "planned-timetable-fetcher.jar",
libraryDependencies ++= Seq(
......@@ -38,9 +46,15 @@ lazy val root = (project in file("."))
// GraphQL
"org.sangria-graphql" %% "sangria" % "1.4.2",
"org.sangria-graphql" %% "sangria-json4s-native" % "1.0.0",
// Caching
"com.github.blemale" %% "scaffeine" % "2.5.0",
// Gatling
"io.gatling.highcharts" % "gatling-charts-highcharts" % "3.0.3" % "test",
"io.gatling" % "gatling-test-framework" % "3.0.3" % "test",
// Test
"org.scalatest" %% "scalatest" % "3.0.5" % "test,it",
)
)
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import java.util.concurrent.atomic.AtomicInteger
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.github.blemale.scaffeine.{LoadingCache, Scaffeine}
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.http.{RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import org.json4s._
......@@ -13,74 +16,74 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class APIStationQuery {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
import com.sksamuel.elastic4s.http.ElasticDsl._
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val queryParams = event.getQueryStringParameters
if (queryParams.containsKey("eva")) {
try {
val eva = queryParams.get("eva").toInt
logger.info(s"Getting eva $eva")
val stationJson = tableStations.getItem("eva", eva).toJSON
new APIGatewayProxyResponseEvent()
.withBody(stationJson)
.withStatusCode(200)
} catch {
case _: NumberFormatException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Invalid eva format ${queryParams.get("eva")}")
.withStatusCode(400)
case _: NullPointerException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva ${queryParams.get("eva")} not found")
.withStatusCode(400)
case e: Exception =>
logger.error("Invalid parameter", e)
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
}
} else if (queryParams.containsKey("q")) {
try {
val esClient = aws.elasticSearchClient()
val esClient: ElasticClient = aws.elasticSearchClient()
val evaCache: LoadingCache[Int, Item] =
Scaffeine()
.expireAfterWrite(1.hour)
.maximumSize(500)
.build((eva: Int) => tableStations.getItem("eva", eva))
val stationNameCache: LoadingCache[String, List[String]] =
Scaffeine()
.expireAfterWrite(1.hour)
.maximumSize(500)
.build { query: String =>
val resp = esClient.execute {
search("stations") matchQuery("name", queryParams.get("q"))
search("stations") matchQuery("name", query)
}.await
esClient.close()
resp match {
case failure: RequestFailure =>
logger.error(failure.error.toString)
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
null
case results: RequestSuccess[SearchResponse] =>
new APIGatewayProxyResponseEvent()
.withBody(write(results.result.hits.hits.map(r => parse(r.sourceAsString))))
.withStatusCode(200)
results.result.hits.hits.map(r => r.sourceAsString).toList
}
} catch {
case _: NumberFormatException =>
}
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val queryParams = event.getQueryStringParameters
if (queryParams.containsKey("eva")) {
val eva = queryParams.get("eva").toInt
evaCache.get(eva) match {
case item: Item =>
new APIGatewayProxyResponseEvent()
.withBody(s"Invalid eva format ${queryParams.get("eva")}")
.withStatusCode(400)
case _: NullPointerException =>
.withBody(item.toJSON)
.withStatusCode(200)
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva ${queryParams.get("eva")} not found")
.withBody(s"Eva $eva not found")
.withStatusCode(400)
case e: Exception =>
logger.error("Invalid parameter", e)
}
} else if (queryParams.containsKey("q")) {
val query = queryParams.get("q")
stationNameCache.get(query) match {
case stationList: List[String] =>
new APIGatewayProxyResponseEvent()
.withBody(write(stationList.map(s => parse(s))))
.withStatusCode(200)
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
}
} else {
new APIGatewayProxyResponseEvent()
......@@ -90,11 +93,31 @@ class APIStationQuery {
}
}
object APIStationQuery extends App {
object TestDynamoDbTiming extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val asq = new APIStationQuery()
val request = new APIGatewayProxyRequestEvent()
.withQueryStringParameters(Map("q" -> "Darmstadt").asJava)
val resp = asq.handleRequest(request, null)
.withQueryStringParameters(Map("eva" -> "8000105").asJava)
val i = new AtomicInteger(0)
val startTime = System.currentTimeMillis()
(0 to 100).map { _ =>
Future {
(0 to 10).foreach { _ =>
asq.handleRequest(request, null)
i.incrementAndGet()
}
}
}.map(f => Await.ready(f, Duration.Inf))
val endTime = System.currentTimeMillis()
println(s"Time for ${i.get} getItems = ${endTime-startTime} ms")
println(s"$resp")
asq.esClient.close()
}
\ No newline at end of file
......@@ -26,7 +26,7 @@ package object aws {
}
def elasticSearchClient(): ElasticClient = {
val esEndpointUrl = "https://"+scala.util.Properties.envOrElse("ELASTICSEARCH_URL", "").split("/")(0)+":80"
val esEndpointUrl = "https://"+scala.util.Properties.envOrElse("ELASTICSEARCH_URL", "localhost").split("/")(0)+":80"
val esProperties = ElasticProperties(esEndpointUrl).endpoints.head
val esEndpoint = s"${esProperties.protocol}://${esProperties.host}:${esProperties.port}"
Aws4ElasticClient(esEndpoint)
......
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda.bench
import ch.qos.logback.classic.{Level, LoggerContext}
import com.typesafe.config.{Config, ConfigFactory}
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
class APIStationQuery extends Simulation {
val context: LoggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
// Suppress logging
context.getLogger("io.gatling").setLevel(Level.valueOf("WARN"))
context.getLogger("io.netty").setLevel(Level.valueOf("WARN"))
val apiConf: Config = ConfigFactory.load("api")
val baseFunctionUrl: String = apiConf.getString("aws.api_gateway_base_url")
val httpProtocol = http
.baseUrl(baseFunctionUrl)
.acceptHeader("text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
.acceptLanguageHeader("en-US,en;q=0.5")
.acceptEncodingHeader("gzip, deflate")
.userAgentHeader("Mozilla/5.0 (X11; Linux x86_64; rv:64.0) Gecko/20100101 Firefox/64.0")
val scnQueryStationByEva = scenario("QueryStationByEva")
.exec(http("QueryStationByEva")
.get("/stations")
.queryParamMap(Map(
"eva" -> "8000105"
))
)
val scnQueryStationByName = scenario("QueryStationByName")
.exec(http("QueryStationByName")
.get("/stations")
.queryParamMap(Map(
"q" -> "Frankfurt"
))
)
setUp(
scnQueryStationByEva.inject(constantConcurrentUsers(1) during(30 seconds)),
scnQueryStationByName.inject(constantConcurrentUsers(1) during(30 seconds)),
).protocols(httpProtocol)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment