Commit b332fb28 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Add service name and stage to DynamoDB tables and ElasticSearch

parent 7853e7da
......@@ -73,6 +73,7 @@ functions:
APISlackBotEventHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
iamRoleStatementsInherit: true
timeout: 30
events:
- schedule: cron(0/2 6-18 ? * MON-FRI *)
- http:
......@@ -86,6 +87,7 @@ functions:
APIStationQuery:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APIStationQuery::handleRequest
iamRoleStatementsInherit: true
timeout: 30
events:
- http:
path: stations
......@@ -93,6 +95,7 @@ functions:
APIGraphQLHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APIGraphQLHandler::handleRequest
iamRoleStatementsInherit: true
timeout: 30
events:
- http:
path: graphql
......@@ -103,7 +106,7 @@ resources:
Stations:
Type: AWS::DynamoDB::Table
Properties:
TableName: Stations
TableName: ${self:service}-${self:provider.stage}-Stations
AttributeDefinitions:
- AttributeName: eva
AttributeType: N
......@@ -123,7 +126,7 @@ resources:
TimeTableStopTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: TimetableStops
TableName: ${self:service}-${self:provider.stage}-TimetableStops
AttributeDefinitions:
- AttributeName: id
AttributeType: S
......@@ -137,7 +140,7 @@ resources:
PlannedTimetables:
Type: AWS::DynamoDB::Table
Properties:
TableName: PlannedTimetables
TableName: ${self:service}-${self:provider.stage}-PlannedTimetables
AttributeDefinitions:
- AttributeName: eva
AttributeType: S
......@@ -151,7 +154,7 @@ resources:
SlackUsers:
Type: AWS::DynamoDB::Table
Properties:
TableName: SlackUsers
TableName: ${self:service}-${self:provider.stage}-SlackUsers
AttributeDefinitions:
- AttributeName: id
AttributeType: S
......@@ -166,7 +169,7 @@ resources:
Type: "AWS::Elasticsearch::Domain"
Properties:
ElasticsearchVersion: "6.3"
DomainName: "bahn-delay-information"
DomainName: "${self:service}-${self:provider.stage}-elasticsearch"
ElasticsearchClusterConfig:
DedicatedMasterEnabled: false
InstanceCount: "1"
......
package de.codecentric.amuttsch.bahndelayinfo.aws
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table, TableWriteItems}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.models.Station
......@@ -19,7 +19,7 @@ object StationImporter extends App {
private implicit val jsonFormats: Formats = DefaultFormats + customSerializer
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val tableStations: Table = ddb.getTable(getServerlessServiceName("Stations"))
val esClient = aws.elasticSearchClient()
esClient.execute {
......@@ -31,16 +31,21 @@ object StationImporter extends App {
val json = parse(jsonString).extract[List[Station]]
json foreach { obj =>
val jsonObj = write(obj)
println(jsonObj)
json.grouped(20).foreach { stationBatch =>
val stationBatchJson = stationBatch.map(s => write(s))
stationBatchJson foreach println
val ops = for(s <- stationBatchJson) yield indexInto("stations", "station").doc(s)
esClient.execute {
indexInto("stations", "station").doc(jsonObj)
bulk(ops: _*)
}.await
tableStations.putItem(
Item
.fromJSON(jsonObj)
)
val items = for(s <- stationBatchJson) yield Item.fromJSON(s)
val twi = new TableWriteItems(getServerlessServiceName("Stations"))
.withItemsToPut(items: _*)
ddb.batchWriteItem(twi)
}
println("Done")
......
......@@ -28,7 +28,7 @@ class APISlackBotEventHandler {
event.getBody
}
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("APISlackBotEventHandlerWorker"))
.withFunctionName(aws.getServerlessServiceName("APISlackBotEventHandlerWorker"))
.withPayload(write(body))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
......
......@@ -6,6 +6,7 @@ import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.slackbot.{SearchStation, _}
import de.codecentric.amuttsch.bahndelayinfo.utils.FiniteQueue._
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
......@@ -25,9 +26,9 @@ class APISlackBotEventHandlerWorker {
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val tablePlannedTimetables: Table = ddb.getTable("PlannedTimetables")
val tableStations: Table = ddb.getTable("Stations")
val tableSlackUsers: Table = ddb.getTable(aws.getServerlessServiceName("SlackUsers"))
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableStations: Table = ddb.getTable(aws.getServerlessServiceName("Stations"))
val indexStationName: Index = tableStations.getIndex("NameIndex")
val apiConf: Config = ConfigFactory.load("api")
......
......@@ -23,7 +23,7 @@ class APIStationQuery {
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val tableStations: Table = ddb.getTable(aws.getServerlessServiceName("Stations"))
val esClient: ElasticClient = aws.elasticSearchClient()
......
......@@ -8,6 +8,7 @@ import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.models.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.slackbot.{SlackClient, SlackUser}
......@@ -32,8 +33,8 @@ class SNSChangeSlackReporter {
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val tableTimetableStops: Table = ddb.getTable("TimetableStops")
val tableSlackUsers: Table = ddb.getTable(aws.getServerlessServiceName("SlackUsers"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
def handleRequest(event: SNSEvent, context: Context): Unit = {
val eventMessage = event.getRecords.get(0).getSNS.getMessage
......@@ -43,8 +44,7 @@ class SNSChangeSlackReporter {
parseDelayInformation(newDelayInformation)
}
def parseDelayInformation(newDelayInformation: SNSNewDelayInformation): Unit = {
def getSlackUsers: Set[SlackUser] = {
var slackRecipients = Set.empty[SlackUser]
val slackUserItems = tableSlackUsers.scan()
......@@ -52,6 +52,11 @@ class SNSChangeSlackReporter {
slackRecipients = slackRecipients + parse(item.toJSON).extract[SlackUser]
}
slackRecipients
}
def parseDelayInformation(newDelayInformation: SNSNewDelayInformation): Unit = {
val stops = tableTimetableStops.scan()
stops.forEach { o =>
......@@ -61,7 +66,7 @@ class SNSChangeSlackReporter {
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.cross(slackRecipients)
.cross(getSlackUsers)
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
......
......@@ -6,6 +6,7 @@ import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBChangedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.models.{PlannedTimetableElement, TimetableInformation}
......@@ -29,8 +30,8 @@ class ScheduledChangesFetchService {
val snsChangedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val evasToFetchItemCollection = tablePlannedTimetables.scan()
// Download timetable
......
......@@ -3,14 +3,13 @@ package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import java.time.LocalDate
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, ScanFilter}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, ScanFilter}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBPlannedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.models.PlannedTimetableElement
import org.json4s._
import org.json4s.native.JsonMethods._
......@@ -29,7 +28,7 @@ class ScheduledPlannedTimetableFetchService {
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val sf: ScanFilter = new ScanFilter("lastFetched").ne(LocalDate.now.toString)
val evasToFetchItemCollection = tablePlannedTimetables.scan(
sf
......@@ -45,7 +44,7 @@ class ScheduledPlannedTimetableFetchService {
val json = write(evaToFetchList.map(_.eva).toList)
logger.info(s"Sending $json")
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withFunctionName(aws.getServerlessServiceName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(json)
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
......
......@@ -7,16 +7,15 @@ import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.sksamuel.elastic4s.http.ElasticDsl.createIndex
import com.sksamuel.elastic4s.http.ElasticDsl.{createIndex, _}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBPlannedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.models.PlannedTimetableElement
import org.json4s._
import org.json4s.native.Serialization.write
import scala.collection.JavaConverters._
import com.sksamuel.elastic4s.http.ElasticDsl._
import de.codecentric.amuttsch.bahndelayinfo.models.PlannedTimetableElement
class ScheduledPlannedTimetableFetchWorker {
......@@ -33,8 +32,8 @@ class ScheduledPlannedTimetableFetchWorker {
val dbPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val esClient = aws.elasticSearchClient()
esClient.execute {
......@@ -78,7 +77,7 @@ class ScheduledPlannedTimetableFetchWorker {
val json = write(tail)
logger.info(s"Invoking next lambda with $json")
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withFunctionName(aws.getServerlessServiceName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(json)
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
......
......@@ -7,12 +7,12 @@ import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
import scala.collection.mutable
package object aws {
def getLambdaFunctionName(functionName: String): String = {
def getServerlessServiceName(serviceName: String): String = {
val service = scala.util.Properties.envOrElse("SERVERLESS_SERVICE", "")
val stage = scala.util.Properties.envOrElse("SERVERLESS_STAGE", "")
if (service.nonEmpty && stage.nonEmpty) {
s"$service-$stage-$functionName"
s"$service-$stage-$serviceName"
} else {
throw new RuntimeException(s"Environment variables SERVERLESS_SERVICE ($service) or SERVERLESS_STAGE ($stage) missing!")
}
......
package de.codecentric.amuttsch.bahndelayinfo.graphql
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.softwaremill.sttp._
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.models.Station
import org.json4s._
import org.json4s.native.JsonMethods._
......@@ -14,7 +15,7 @@ class Repository {
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val tableStations: Table = ddb.getTable(aws.getServerlessServiceName("Stations"))
def getStationByEva(eva: Int): Option[Station] = {
tableStations.getItem("eva", eva) match {
......
......@@ -56,9 +56,9 @@ sealed trait DynamoDbConnection extends SlackCommand {
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val tablePlannedTimetables: Table = ddb.getTable("PlannedTimetables")
val tableStations: Table = ddb.getTable("Stations")
val tableSlackUsers: Table = ddb.getTable(aws.getServerlessServiceName("SlackUsers"))
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableStations: Table = ddb.getTable(aws.getServerlessServiceName("Stations"))
val indexStationName: Index = tableStations.getIndex("NameIndex")
}
......@@ -160,7 +160,7 @@ case object AddStation extends SlackCommand with StationFinder {
case Right(s) =>
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withFunctionName(aws.getServerlessServiceName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(write(List(s.eva.toString)))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment