Commit d2f2c9e5 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Improve DynamoDB access on ScheduledChangesFetchService

parent 83e1f591
......@@ -61,7 +61,7 @@ lazy val root = (project in file("."))
"org.sangria-graphql" %% "sangria" % "1.4.2",
"org.sangria-graphql" %% "sangria-json4s-native" % "1.0.0",
// Caching4
// Caching
"com.github.blemale" %% "scaffeine" % "2.5.0",
// Gatling
......
......@@ -52,11 +52,11 @@ functions:
ScheduledChangesFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledChangesFetchService::handleRequest
events:
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
# Monday to Friday - every minute - from 6-10 and 15-19
# UTC !!!
#- schedule: cron(0/2 5-10 ? * MON-FRI *)
#- schedule: cron(0/2 14-19 ? * MON-FRI *)
- schedule: cron(0/2 5-19 ? * MON-FRI *)
- schedule: cron(0/1 5-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
......
......@@ -6,11 +6,11 @@ import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.github.blemale.scaffeine.{LoadingCache, Scaffeine}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.models.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.slackbot.{SlackClient, SlackUser}
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import de.codecentric.amuttsch.bahndelayinfo.utils.TraversableImplicits._
......@@ -18,10 +18,12 @@ import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.util.{Success, Try}
import scala.concurrent.duration._
class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.LocalDateTimeSerializer +
JsonSerializers.LocalTimeSerializer
......@@ -34,7 +36,11 @@ class SNSChangeSlackReporter {
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable(aws.getServerlessServiceName("SlackUsers"))
val tableTimetableStops: Table = ddb.getTable(aws.getServerlessServiceName("TimetableStops"))
val slackUserCache: LoadingCache[Unit, Set[SlackUser]] =
Scaffeine()
.expireAfterWrite(5.minutes)
.build((_: Unit) => getSlackUsers)
def handleRequest(event: SNSEvent, context: Context): Unit = {
val eventMessage = event.getRecords.get(0).getSNS.getMessage
......@@ -56,55 +62,42 @@ class SNSChangeSlackReporter {
}
def parseDelayInformation(newDelayInformation: SNSNewDelayInformation): Unit = {
Some(newDelayInformation.tti)
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.cross(slackUserCache.get())
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
.flatMap { case (tti, sr) =>
sr.delayRegistration.map(d => (tti, sr, d))
}
.filter { case (tti, _, d) => d.eva == tti.eva &&
(tti.train.matches(d.trainRegex) || tti.train.startsWith(d.trainRegex) || tti.train.replace(" ", "").startsWith(d.trainRegex)) }
.filter { case (tti, _, d) => d.direction.isEmpty || tti.pathTo.contains(d.direction.get) }
.filter { case (tti, _, d) => d.from.isEmpty || d.until.isEmpty ||
(d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
.foreach { case (tti, sr, _) =>
slackApiClient.postMessage(
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}")
val suJson = tableSlackUsers.getItem("id", sr.user).toJSON
val newSlackUser = parse(suJson).extract[SlackUser]
newSlackUser.seenTimetableInformation += (tti.id -> tti)
newSlackUser.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", newSlackUser.user)
tableSlackUsers.putItem(item)
}
val stops = tableTimetableStops.scan()
stops.forEach { o =>
Some(o)
.map(item => Try(TimetableInformation.fromJson(item.getJSON("tti"))))
.collect { case Success(value) => value}
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.cross(getSlackUsers)
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
.flatMap { case (tti, sr) =>
sr.delayRegistration.map(d => (tti, sr, d))
}
.filter { case (tti, _, d) => d.eva == tti.eva &&
(tti.train.matches(d.trainRegex) || tti.train.startsWith(d.trainRegex) || tti.train.replace(" ", "").startsWith(d.trainRegex)) }
.filter { case (tti, _, d) => d.direction.isEmpty || tti.pathTo.contains(d.direction.get) }
.filter { case (tti, _, d) => d.from.isEmpty || d.until.isEmpty ||
(d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
.foreach { case (tti, sr, _) =>
slackApiClient.postMessage(
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}")
val suJson = tableSlackUsers.getItem("id", sr.user).toJSON
val newSlackUser = parse(suJson).extract[SlackUser]
newSlackUser.seenTimetableInformation += (tti.id -> tti)
newSlackUser.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", newSlackUser.user)
tableSlackUsers.putItem(item)
}
}
}
}
object SNSChangeSlackReporter extends App {
val r = new SNSChangeSlackReporter()
r.parseDelayInformation(
SNSNewDelayInformation("8000105", 52)
)
}
\ No newline at end of file
......@@ -20,7 +20,7 @@ import scala.util.{Failure, Success, Try}
class ScheduledChangesFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledChangesFetchService.getClass)
implicit val logger: Logger = Logger(this.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val snsClient = AmazonSNSClientBuilder.defaultClient()
......@@ -51,22 +51,24 @@ class ScheduledChangesFetchService {
val newTti = ttiDb.updateDelay(tti)
// Update entry in dynamodb
val newItem = new Item()
.withPrimaryKey("id", newTti.id)
.withJSON("tti", newTti.toJson)
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
if (tti != newTti) {
val newItem = new Item()
.withPrimaryKey("id", newTti.id)
.withJSON("tti", newTti.toJson)
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
val snsMessage = SNSNewDelayInformation(element.eva, newTti)
snsClient.publish(snsChangedTimetableInformationTopicArn, write(snsMessage))
} else {
logger.info("")
}
case Failure(_) =>
logger.error(s"Could not parse JSON: $ttiJson")
tableTimetableStops.deleteItem("id", tti.id)
}
}
}
val snsMessage = SNSNewDelayInformation(element.eva, timetableInformations.size)
snsClient.publish(snsChangedTimetableInformationTopicArn, write(snsMessage))
}
}
}
object ScheduledChangesFetchService
\ No newline at end of file
}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.sns
import de.codecentric.amuttsch.bahndelayinfo.models.TimetableInformation
sealed trait SNSMessage
case class SNSNewDelayInformation(
eva: String,
amount: Int
tti: TimetableInformation
) extends SNSMessage
......@@ -7,7 +7,7 @@ import de.codecentric.amuttsch.bahndelayinfo.models.TimetableInformation
class DBChangedTimetableFetcher extends DBFetcher {
override def fetchUrl(ttr: TimetableRequest)(implicit apiConf: Config): Uri = {
uri"${apiConf.getString("dbapi.url")}/timetables/v1/fchg/${ttr.eva}"
uri"${apiConf.getString("dbapi.url")}/timetables/v1/rchg/${ttr.eva}"
}
override def fetchTimetablesForEvas(evaToFetch: List[String]): Seq[TimetableInformation] = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment