Commit 9fb17ef3 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Minor refactoring

parent e4416afb
......@@ -11,6 +11,7 @@ provider:
- Effect: Allow
Action:
- lambda:InvokeFunction
- apigateway:GET
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
......
......@@ -60,7 +60,6 @@ class ScheduledPlannedTimetableFetchTest extends FunSpec with BeforeAndAfterEach
.getValue
.replace(LocalDate.now.format(format), "190215")
.replace("/dbapi/timetables/v1/plan", "dbapi") + ".xml"
println(resourceFileName)
val resp = Source.fromResource(resourceFileName).mkString
......
......@@ -20,8 +20,9 @@ class APISlackBotEventHandler {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val lambdaClient = AWSLambdaClientBuilder.defaultClient
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val lambdaClient = AWSLambdaClientBuilder.defaultClient
logger.info(s"Got event ${event.getBody}")
......
......@@ -38,7 +38,7 @@ class APISlackBotEventHandlerWorker {
val spaceSplitRegex = "\"?( |$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
val commands = Map(
val commands: Map[String, SlackCommand] = Map(
SearchStation.cmd -> SearchStation,
AddStation.cmd -> AddStation,
SubscribeDelay.cmd -> SubscribeDelay,
......
......@@ -65,7 +65,7 @@ class APIStationQuery {
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva $eva not found")
.withStatusCode(400)
.withStatusCode(404)
}
} else if (queryParams.containsKey("q")) {
val query = queryParams.get("q")
......@@ -78,7 +78,7 @@ class APIStationQuery {
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
.withStatusCode(404)
}
} else {
......
......@@ -20,7 +20,6 @@ import org.json4s.native.Serialization.write
import scala.concurrent.duration._
class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(this.getClass)
......@@ -107,18 +106,20 @@ class SNSChangeSlackReporter {
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}")
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station} and train $tti")
val suJson = tableSlackUsers.getItem("id", sr.user).toJSON
val newSlackUser = parse(suJson).extract[SlackUser]
newSlackUser.seenTimetableInformation += (tti.id -> tti)
newSlackUser.seenTimetableInformation.retain((_, v) => !v.isHistory)
// Update the local reference as well as we use the cache instead of the database for getting our users
sr.seenTimetableInformation += (tti.id -> tti)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", newSlackUser.user)
tableSlackUsers.putItem(item)
}
}
}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.dynamodbv2.document._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.services.sns.{AmazonSNS, AmazonSNSClientBuilder}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
......@@ -17,31 +17,26 @@ import org.json4s.native.Serialization.write
import scala.util.{Failure, Success, Try}
object ScheduledChangesFetchService extends App {
new ScheduledChangesFetchService().handleRequest(null, null)
}
class ScheduledChangesFetchService {
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.NoEmptyStringSerializer +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.LocalDateTimeSerializer
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.NoEmptyStringSerializer +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.LocalDateTimeSerializer
val snsClient: AmazonSNS = AmazonSNSClientBuilder.defaultClient()
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher: DBChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsChangedTimetableInformationTopicArn: String =
snsClient.createTopic(aws.getServerlessServiceName("ChangedTimetableStops")).getTopicArn
val snsChangedTimetableInformationTopicArn =
snsClient.createTopic(aws.getServerlessServiceName("ChangedTimetableStops")).getTopicArn
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val evasToFetchItemCollection = tablePlannedTimetables.scan()
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
val evasToFetchItemCollection: ItemCollection[ScanOutcome] = tablePlannedTimetables.scan()
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
......@@ -68,8 +63,6 @@ class ScheduledChangesFetchService {
val snsMessage = SNSNewDelayInformation(element.eva, newTti)
snsClient.publish(snsChangedTimetableInformationTopicArn, write(snsMessage))
} else {
logger.info("")
}
case Failure(_) =>
logger.error(s"Could not parse JSON: $ttiJson")
......
......@@ -2,11 +2,12 @@ package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import java.time.LocalDate
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.{AWSLambda, AWSLambdaClientBuilder}
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl.{createIndex, _}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
......@@ -19,6 +20,20 @@ import scala.collection.JavaConverters._
class ScheduledPlannedTimetableFetchWorker {
val lambdaClient: AWSLambda = AWSLambdaClientBuilder.defaultClient
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher: DBPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val esClient: ElasticClient = aws.elasticSearchClient()
esClient.execute {
createIndex("timetables")
}.await
def handleRequest(event: java.util.List[String], context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledPlannedTimetableFetchService.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
......@@ -26,20 +41,6 @@ class ScheduledPlannedTimetableFetchWorker {
val evaList = event.asScala.toList
logger.info(s"Processing first eva of list $evaList")
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val esClient = aws.elasticSearchClient()
esClient.execute {
createIndex("timetables")
}.await
// Download timetable
evaList match {
case (eva: String) :: tail =>
......@@ -84,15 +85,6 @@ class ScheduledPlannedTimetableFetchWorker {
}
case _ =>
}
esClient.close()
lambdaClient.shutdown()
}
}
object ScheduledPlannedTimetableFetchWorker extends App {
val sptfw = new ScheduledPlannedTimetableFetchWorker()
val evas = new java.util.LinkedList[String]()
evas.add("8000105")
sptfw.handleRequest(evas, null)
}
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda.gatling
import ch.qos.logback.classic.{Level, LoggerContext}
import com.typesafe.config.{Config, ConfigFactory}
import de.codecentric.amuttsch.bahndelayinfo.aws
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import org.slf4j.LoggerFactory
......@@ -14,8 +14,7 @@ class APIStationQuery extends Simulation {
context.getLogger("io.gatling").setLevel(Level.valueOf("WARN"))
context.getLogger("io.netty").setLevel(Level.valueOf("WARN"))
val apiConf: Config = ConfigFactory.load("api")
val baseFunctionUrl: String = apiConf.getString("aws.api_gateway_base_url")
val baseFunctionUrl: String = aws.getApiGatewayUrl
val httpProtocol = http
.baseUrl(baseFunctionUrl)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment