Commit 5516887f authored by Andreas Muttscheller's avatar Andreas Muttscheller

Work

parent 1442eb42
......@@ -16,7 +16,7 @@ libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3",
// Http client
"com.softwaremill.sttp" %% "core" % "1.5.2",
"com.softwaremill.sttp" %% "core" % "1.5.4",
// xml and json
"org.scala-lang.modules" %% "scala-xml" % "1.1.1",
......@@ -26,10 +26,10 @@ libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "2.1.0",
// AWS
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.475",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.479",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.479",
"com.amazonaws" % "aws-lambda-java-events" % "2.2.4",
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.483",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.483",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.483",
"com.amazonaws" % "aws-lambda-java-events" % "2.2.5",
"com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
// Slack
......
......@@ -14,6 +14,7 @@ provider:
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:Query
- sns:CreateTopic
- sns:Publish
- es:ESHttpPost
......@@ -59,8 +60,9 @@ functions:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledChangesFetchService::handleRequest
events:
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
- schedule: cron(0/2 6-10 ? * MON-FRI *)
- schedule: cron(0/2 15-19 ? * MON-FRI *)
# UTC !!!
- schedule: cron(0/2 5-10 ? * MON-FRI *)
- schedule: cron(0/2 14-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
......@@ -76,6 +78,7 @@ functions:
method: post
APISlackBotEventHandlerWorker:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandlerWorker::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
APIStationQuery:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APIStationQuery::handleRequest
iamRoleStatementsInherit: true
......
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import java.net.URLDecoder
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
......@@ -30,11 +32,21 @@ class APISlackBotEventHandler {
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("APISlackBotEventHandlerWorker"))
.withPayload(write(event.getBody))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
logger.info(s"Got event ${event.getBody}")
if (event.getBody != null) {
val payloadStr = "payload="
val body = if (event.getBody.startsWith(payloadStr)) {
URLDecoder.decode(event.getBody.substring(payloadStr.length), "UTF-8")
} else {
event.getBody
}
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("APISlackBotEventHandlerWorker"))
.withPayload(write(body))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
}
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
......
......@@ -69,7 +69,7 @@ class SNSChangeSlackReporter {
.foreach { case (tti, sr) =>
val resp = slackApiClient.postChatMessage(
sr.channel,
tti.toString
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
......
......@@ -7,14 +7,16 @@ import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.sksamuel.elastic4s.{IndexAndTypes, IndexesAndTypes}
import com.sksamuel.elastic4s.http.ElasticDsl.createIndex
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBPlannedTimetableFetcher
import org.json4s._
import org.json4s.native.Serialization.write
import scala.collection.JavaConverters._
import com.sksamuel.elastic4s.http.ElasticDsl._
class ScheduledPlannedTimetableFetchWorker {
......@@ -34,14 +36,27 @@ class ScheduledPlannedTimetableFetchWorker {
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val esClient = aws.elasticSearchClient()
esClient.execute {
createIndex("timetables")
}.await
// Download timetable
evaList match {
case (eva: String) :: tail =>
val evaToFetch = List(eva)
esClient.execute {
deleteByQuery("timetables", "stop", s"""eva:"$eva"""")
}.await
val timetableInformations = dbPlannedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { tti =>
esClient.execute {
indexInto("timetables", "stop").doc(tti.toJson)
}.await
val item = new Item()
.withPrimaryKey("id", tti.id)
.withJSON("tti", tti.toJson)
......@@ -52,16 +67,26 @@ class ScheduledPlannedTimetableFetchWorker {
val newElement = aws.PlannedTimetableElement(eva, LocalDate.now.toString)
tablePlannedTimetables.putItem(Item.fromJSON(write(newElement)))
val json = write(tail)
logger.info(s"Invoking next lambda with $json")
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(json)
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
if (tail.nonEmpty) {
val json = write(tail)
logger.info(s"Invoking next lambda with $json")
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(json)
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
}
case _ =>
}
esClient.close()
lambdaClient.shutdown()
}
}
object ScheduledPlannedTimetableFetchWorker extends App {
val sptfw = new ScheduledPlannedTimetableFetchWorker()
val evas = new java.util.LinkedList[String]()
evas.add("8001377")
sptfw.handleRequest(evas, null)
}
......@@ -45,20 +45,29 @@ case class TimetableInformation(
}
}
override def toString: String = {
override def toString: String = toString(printDelay = true)
def toString(printDelay: Boolean = true): String = {
val prettyPlannedTime = TimetableInformation.printTimeFormat.format(plannedTime)
val prettyCurrentTime = TimetableInformation.printTimeFormat.format(currentTime)
val delay = TimeUnit.MILLISECONDS.toMinutes(currentTime.getTime - plannedTime.getTime)
val direction = if (finalStop) "arrival " else "departure"
if (cancelled) {
f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime - CANCELLED!"
} else if (delayInMinutes == 0) {
f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime - now $prettyCurrentTime, is on time! "
val plannedTimeString = f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime"
if (printDelay) {
if (cancelled) {
f"$plannedTimeString - CANCELLED!"
} else if (delayInMinutes == 0) {
f"$plannedTimeString - now $prettyCurrentTime, is on time! "
} else {
f"$plannedTimeString - now $prettyCurrentTime, is delayed by $delay%2d minutes!"
}
} else {
f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime - now $prettyCurrentTime, is delayed by $delay%2d minutes!"
plannedTimeString
}
}
def updateDelay(newTti: TimetableInformation): TimetableInformation = {
......
......@@ -7,7 +7,9 @@ import scala.collection.mutable
case class DelayRegistration(
trainRegex: String,
eva: String
)
) {
override def toString: String = s""
}
case class SlackUser(
user: String,
......
package de.codecentric.amuttsch.bahndelayinfo.utils
import scala.collection.mutable
object FiniteQueue {
implicit class FinitieQueue[A](q: mutable.Queue[A]) {
def enqueueFinite(elem: A, maxSize: Int): Unit = {
q.enqueue(elem)
while (q.size > maxSize) { q.dequeue() }
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment