...
 
Commits (2)
......@@ -11,6 +11,7 @@ provider:
- Effect: Allow
Action:
- lambda:InvokeFunction
- apigateway:GET
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
......
......@@ -60,7 +60,6 @@ class ScheduledPlannedTimetableFetchTest extends FunSpec with BeforeAndAfterEach
.getValue
.replace(LocalDate.now.format(format), "190215")
.replace("/dbapi/timetables/v1/plan", "dbapi") + ".xml"
println(resourceFileName)
val resp = Source.fromResource(resourceFileName).mkString
......
......@@ -20,8 +20,9 @@ class APISlackBotEventHandler {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val lambdaClient = AWSLambdaClientBuilder.defaultClient
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val lambdaClient = AWSLambdaClientBuilder.defaultClient
logger.info(s"Got event ${event.getBody}")
......
......@@ -38,7 +38,7 @@ class APISlackBotEventHandlerWorker {
val spaceSplitRegex = "\"?( |$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
val commands = Map(
val commands: Map[String, SlackCommand] = Map(
SearchStation.cmd -> SearchStation,
AddStation.cmd -> AddStation,
SubscribeDelay.cmd -> SubscribeDelay,
......
......@@ -65,7 +65,7 @@ class APIStationQuery {
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva $eva not found")
.withStatusCode(400)
.withStatusCode(404)
}
} else if (queryParams.containsKey("q")) {
val query = queryParams.get("q")
......@@ -78,7 +78,7 @@ class APIStationQuery {
case null =>
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
.withStatusCode(404)
}
} else {
......
......@@ -20,7 +20,6 @@ import org.json4s.native.Serialization.write
import scala.concurrent.duration._
class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(this.getClass)
......@@ -63,41 +62,64 @@ class SNSChangeSlackReporter {
def parseDelayInformation(newDelayInformation: SNSNewDelayInformation): Unit = {
Some(newDelayInformation.tti)
// eva must be correct
.filter(_.eva == newDelayInformation.eva)
// The train must depart in the future
.filter(!_.isHistory)
// Show only delays with more than 2 minutes
.filter(_.delayInMinutes > 2)
// Cross timetable information with all users
.cross(slackUserCache.get(Unit))
// Filter only users that haven't seen this delay or if the difference between the
// last delay shown is more than 2 minutes
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
// Flatten the list and add the delay registration information to it
.flatMap { case (tti, sr) =>
sr.delayRegistration.map(d => (tti, sr, d))
}
// Compare the train name of the delay with the given user input
// Regex works as well as prefixing the train name, e.g. RB.* or RB68 or RB 68
.filter { case (tti, _, d) => d.eva == tti.eva &&
(tti.train.matches(d.trainRegex) || tti.train.startsWith(d.trainRegex) || tti.train.replace(" ", "").startsWith(d.trainRegex)) }
// If the user wants only trains going in a specific direction, filter the results
.filter { case (tti, _, d) => d.direction.isEmpty || tti.pathTo.contains(d.direction.get) }
// Check if the planned departure time is in the given time range
.filter { case (tti, _, d) => d.from.isEmpty || d.until.isEmpty ||
(d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
// All checks have passed, send the delay information to the user
.foreach { case (tti, sr, _) =>
slackApiClient.postMessage(
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}")
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station} and train $tti")
val suJson = tableSlackUsers.getItem("id", sr.user).toJSON
val newSlackUser = parse(suJson).extract[SlackUser]
newSlackUser.seenTimetableInformation += (tti.id -> tti)
newSlackUser.seenTimetableInformation.retain((_, v) => !v.isHistory)
// Update the local reference as well as we use the cache instead of the database for getting our users
sr.seenTimetableInformation += (tti.id -> tti)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", newSlackUser.user)
tableSlackUsers.putItem(item)
}
}
}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.dynamodbv2.document._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.services.sns.{AmazonSNS, AmazonSNSClientBuilder}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBChangedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.models.{PlannedTimetableElement, TimetableInformation}
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.util.{Failure, Success, Try}
class ScheduledChangesFetchService {
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.NoEmptyStringSerializer +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.LocalDateTimeSerializer
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val snsClient: AmazonSNS = AmazonSNSClientBuilder.defaultClient()
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher: DBChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsChangedTimetableInformationTopicArn: String =
snsClient.createTopic(aws.getServerlessServiceName("ChangedTimetableStops")).getTopicArn
val snsChangedTimetableInformationTopicArn =
snsClient.createTopic(aws.getServerlessServiceName("ChangedTimetableStops")).getTopicArn
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val evasToFetchItemCollection = tablePlannedTimetables.scan()
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
val evasToFetchItemCollection: ItemCollection[ScanOutcome] = tablePlannedTimetables.scan()
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
......@@ -60,8 +63,6 @@ class ScheduledChangesFetchService {
val snsMessage = SNSNewDelayInformation(element.eva, newTti)
snsClient.publish(snsChangedTimetableInformationTopicArn, write(snsMessage))
} else {
logger.info("")
}
case Failure(_) =>
logger.error(s"Could not parse JSON: $ttiJson")
......
......@@ -2,11 +2,12 @@ package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import java.time.LocalDate
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.{AWSLambda, AWSLambdaClientBuilder}
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl.{createIndex, _}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
......@@ -19,6 +20,20 @@ import scala.collection.JavaConverters._
class ScheduledPlannedTimetableFetchWorker {
val lambdaClient: AWSLambda = AWSLambdaClientBuilder.defaultClient
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher: DBPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tablePlannedTimetables: Table = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val esClient: ElasticClient = aws.elasticSearchClient()
esClient.execute {
createIndex("timetables")
}.await
def handleRequest(event: java.util.List[String], context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledPlannedTimetableFetchService.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
......@@ -26,20 +41,6 @@ class ScheduledPlannedTimetableFetchWorker {
val evaList = event.asScala.toList
logger.info(s"Processing first eva of list $evaList")
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable(aws.getServerlessServiceName("PlannedTimetables"))
val tableTimetableStops = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val esClient = aws.elasticSearchClient()
esClient.execute {
createIndex("timetables")
}.await
// Download timetable
evaList match {
case (eva: String) :: tail =>
......@@ -84,15 +85,6 @@ class ScheduledPlannedTimetableFetchWorker {
}
case _ =>
}
esClient.close()
lambdaClient.shutdown()
}
}
object ScheduledPlannedTimetableFetchWorker extends App {
val sptfw = new ScheduledPlannedTimetableFetchWorker()
val evas = new java.util.LinkedList[String]()
evas.add("8000105")
sptfw.handleRequest(evas, null)
}
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda.gatling
import ch.qos.logback.classic.{Level, LoggerContext}
import com.typesafe.config.{Config, ConfigFactory}
import de.codecentric.amuttsch.bahndelayinfo.aws
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import org.slf4j.LoggerFactory
......@@ -14,8 +14,7 @@ class APIStationQuery extends Simulation {
context.getLogger("io.gatling").setLevel(Level.valueOf("WARN"))
context.getLogger("io.netty").setLevel(Level.valueOf("WARN"))
val apiConf: Config = ConfigFactory.load("api")
val baseFunctionUrl: String = apiConf.getString("aws.api_gateway_base_url")
val baseFunctionUrl: String = aws.getApiGatewayUrl
val httpProtocol = http
.baseUrl(baseFunctionUrl)
......