Commit 2cee9878 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Improve Lambdas

parent f36681ee
......@@ -20,7 +20,6 @@ libraryDependencies ++= Seq(
// xml and json
"org.scala-lang.modules" %% "scala-xml" % "1.1.1",
"com.google.code.gson" % "gson" % "2.8.5",
"org.json4s" %% "json4s-native" % "3.6.3",
// Kafka
......
......@@ -10,6 +10,7 @@ provider:
iamRoleStatements:
- Effect: Allow
Action:
- lambda:InvokeFunction
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
......@@ -35,15 +36,19 @@ functions:
ScheduledPlannedTimetableFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledPlannedTimetableFetchService::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
#events:
iamRoleStatementsInherit: true
events:
# Monday to Friday - once at 3am
#- schedule: cron(0 3 ? * MON-FRI *)
- schedule: cron(0 3 ? * MON-FRI *)
ScheduledPlannedTimetableFetchWorker:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledPlannedTimetableFetchWorker::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
ScheduledChangesFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledChangesFetchService::handleRequest
#events:
events:
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
# - schedule: cron(0/2 6-10 ? * MON-FRI *)
# - schedule: cron(0/2 15-19 ? * MON-FRI *)
- schedule: cron(0/2 6-10 ? * MON-FRI *)
- schedule: cron(0/2 15-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
......@@ -51,10 +56,13 @@ functions:
- sns: ChangedTimetableStops
APISlackBotEventHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
iamRoleStatementsInherit: true
events:
- http:
path: slack/event
method: post
APISlackBotEventHandlerWorker:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandlerWorker::handleRequest
resources:
Resources:
......
package de.codecentric.amuttsch.bahndelayinfo
object KafkaTopics {
val TOPIC_TIMETABLE_JSON = "timetable_json"
val TOPIC_TIMETABLE_CHANGES_JSON = "timetable_changes_json"
}
package de.codecentric.amuttsch.bahndelayinfo
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import slack.api.SlackApiClient
import slack.rtm.SlackRtmClient
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//object SlackBot extends App {
// val apiConf = ConfigFactory.load("api")
//
// implicit val system = ActorSystem("slack")
// val token = apiConf.getString("slack.token")
// val slackRtmClient = SlackRtmClient(token)
// val slackApiClient = SlackApiClient(token)
//
// slackRtmClient.onMessage { message =>
// println(s"User: ${message.user}, Message: ${message.text} - ${message.channel} - ${message.getClass.getSimpleName}")
// message.text.split("\"") match {
// case "register" :: params =>
// if (params.size != 3) {
// slackRtmClient.sendMessage(message.channel, "Wrong format. Use \"register <train regex> <station regex>")
// } else {
// try {
// val trainRegex = params.head.asInstanceOf[String].r
// val stationRegex = params(2).asInstanceOf[String].r
// ccr.registerSlackUser(message.channel, trainRegex, stationRegex)
// slackRtmClient.sendMessage(message.channel, "Registered!")
// } catch {
// case e: Exception =>
// e.printStackTrace()
// slackRtmClient.sendMessage(message.channel, "Invalid regex")
// }
// }
// case "unregister" =>
// ccr.unregisterSlackUser(message.channel)
// slackRtmClient.sendMessage(message.channel, "Unregistered!")
// case _ =>
// }
// }
//
// val result = Await.ready(f, Duration.Inf).value.get
//
// result match {
// case scala.util.Success(_) => println("finished successfully")
// case scala.util.Failure(exception) => throw exception
// }
//}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws
import java.time.LocalDate
case class PlannedTimetableElement(
eva: String,
lastFetched: String
var eva: String,
var lastFetched: String
)
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import scala.collection.JavaConverters._
import slack.api.SlackApiClient
import scala.util.{Failure, Success, Try}
import de.codecentric.amuttsch.bahndelayinfo.aws
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
class APISlackBotEventHandler {
implicit val logger: Logger = Logger(APISlackBotEventHandler.getClass)
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
implicit val system: ActorSystem = ActorSystem("slack")
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val apiConf: Config = ConfigFactory.load("api")
val token: String = apiConf.getString("slack.token")
val slackApiClient: SlackApiClient = SlackApiClient(token)
val slackApiClient: BlockingSlackApiClient = BlockingSlackApiClient(token)
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
Try(parse(event.getBody)) match {
case Success(json) =>
logger.info(json.toString)
val lambdaClient = AWSLambdaClientBuilder.defaultClient
json \ "type" match {
case JString("url_verification") =>
urlVerification(json)
case _ =>
new APIGatewayProxyResponseEvent()
.withStatusCode(500)
.withBody("Don't know what to do")
}
case Failure(e) =>
logger.info(s"Invalid json body ${event.getBody}")
new APIGatewayProxyResponseEvent()
.withStatusCode(500)
.withBody(s"Invalid json body ${event.getBody}")
}
}
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("APISlackBotEventHandlerWorker"))
.withPayload(write(event.getBody))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
def urlVerification(json: JValue): APIGatewayProxyResponseEvent = {
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
.withBody((json \ "challenge").extract[String])
.withHeaders(
Map(
"Content-type" -> "text/plain"
).asJava
)
}
}
object APISlackBotEventHandler
\ No newline at end of file
}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.{Message, SlackUser, UrlVerification}
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
class APISlackBotEventHandlerWorker {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
implicit val system: ActorSystem = ActorSystem("slack")
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val apiConf: Config = ConfigFactory.load("api")
val token: String = apiConf.getString("slack.token")
val slackApiClient: BlockingSlackApiClient = BlockingSlackApiClient(token)
def handleRequest(event: Object, context: Context): Unit = {
logger.info(s"event: $event")
Try(parse(event.toString)) match {
case Success(json) =>
logger.info(compact(render(json)))
json \ "type" match {
case JString("url_verification") =>
val urlVerificationMsg = json.extract[UrlVerification]
urlVerification(urlVerificationMsg)
case JString("event_callback") =>
logger.info(s"event_callback ${json \ "event"}")
json \ "event" \ "type" match {
case JString("message") =>
val msg = (json \ "event" ).extract[Message]
if (!(json \ "authed_users").extract[Array[String]].contains(msg.user)) {
println(msg)
parseMessage(msg)
}
case _ =>
}
case _ =>
logger.info(s"Dont know what to do")
}
case Failure(e) =>
logger.info(s"Invalid event $event")
}
}
def urlVerification(urlVerification: UrlVerification): APIGatewayProxyResponseEvent = {
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
.withBody(urlVerification.challenge)
.withHeaders(
Map(
"Content-type" -> "text/plain"
).asJava
)
}
def parseMessage(message: Message): APIGatewayProxyResponseEvent = {
message.text.split("\"").map(_.trim).toList match {
case "register" :: params =>
if (params.size != 3) {
slackApiClient.postChatMessage(message.channel, "Wrong format. Use \"register <train regex> <station regex>")
} else {
try {
// Create regex to ensure proper syntax
val trainRegex = params.head.r
val stationRegex = params(2).r
val newSlackUser = SlackUser(
message.user,
message.channel,
trainRegex.toString(),
stationRegex.toString()
)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user)
tableSlackUsers.putItem(item)
slackApiClient.postChatMessage(message.channel, "Registered!", asUser = Some(true))
} catch {
case e: Exception =>
e.printStackTrace()
slackApiClient.postChatMessage(message.channel, "Invalid regex", asUser = Some(true))
}
}
case "unregister" :: _ =>
tableSlackUsers.deleteItem("id", message.user)
slackApiClient.postChatMessage(message.channel, "Unregistered!", asUser = Some(true))
case tail =>
slackApiClient.postChatMessage(message.channel, s"Unknown command: $tail", asUser = Some(true))
}
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
}
}
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, ScanFilter, Table}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.google.gson.Gson
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.reporter.SlackUser
import slack.api.SlackApiClient
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.SlackUser
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
class SNSChangeSlackReporter {
implicit val system: ActorSystem = ActorSystem("slack")
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val apiConf: Config = ConfigFactory.load("api")
val token: String = apiConf.getString("slack.token")
val slackApiClient: SlackApiClient = SlackApiClient(token)
val gson: Gson = new Gson()
val slackApiClient: BlockingSlackApiClient = BlockingSlackApiClient(token)
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
......@@ -33,50 +33,64 @@ class SNSChangeSlackReporter {
def handleRequest(event: SNSEvent, context: Context): Unit = {
val eventMessage = event.getRecords.get(0).getSNS.getMessage
logger.info(s"Received message $eventMessage")
val newDelayInformation = gson.fromJson(eventMessage, classOf[SNSNewDelayInformation])
val newDelayInformation = parse(eventMessage).extract[SNSNewDelayInformation]
parseDelayInformation(newDelayInformation)
}
def parseDelayInformation(newDelayInformation: SNSNewDelayInformation): Unit = {
implicit val system: ActorSystem = ActorSystem("slack")
var slackRecipients = Set.empty[SlackUser]
val slackUserItems = tableSlackUsers.scan()
slackUserItems.forEach { item =>
slackRecipients = slackRecipients + gson.fromJson(item.toJSON, classOf[SlackUser])
slackRecipients = slackRecipients + parse(item.toJSON).extract[SlackUser]
}
val scanFilter: ScanFilter = new ScanFilter("eva").eq(newDelayInformation.eva)
val stops = tableTimetableStops.scan(
scanFilter
)
logger.info(s"Slack Recipients:\n${slackRecipients.mkString("\n")}")
stops.forEach { stop =>
val tti = gson.fromJson(stop.toJSON, classOf[TimetableInformation])
if (tti.isHistory) {
return
}
val stops = tableTimetableStops.scan()
slackRecipients foreach { sr =>
if (tti.station.matches(sr.stationRegex) &&
tti.train.matches(sr.trainRegex)
) {
if (!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
aws.dynamoDBScanToScala(stops)
.map(item => TimetableInformation.fromJson(item.getJSON("tti")))
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 0)
.foreach { tti =>
slackRecipients foreach { sr =>
if (tti.station.matches(sr.stationRegex) &&
tti.train.matches(sr.trainRegex)
) {
sr.seenTimetableInformation.update(tti.id, tti)
sendDelayInformationToUser(sr, tti)
if (!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
) {
val resp = slackApiClient.postChatMessage(
sr.channel,
tti.toString
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
sr.seenTimetableInformation += (tti.id -> tti)
sr.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(sr))
.withPrimaryKey("id", sr.user)
tableSlackUsers.putItem(item)
}
}
}
}
}
}
def sendDelayInformationToUser(user: SlackUser, tti: TimetableInformation): Unit = {
logger.info(s"Sending messages to user for station ${tti.station}")
slackApiClient.postChatMessage(
user.user,
tti.toString
)
system.terminate()
}
}
object SNSChangeSlackReporter
\ No newline at end of file
object SNSChangeSlackReporter extends App {
val r = new SNSChangeSlackReporter()
r.parseDelayInformation(
SNSNewDelayInformation("8000105", 52)
)
}
\ No newline at end of file
......@@ -5,17 +5,20 @@ import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.{DBChangedTimetableFetcher, TimetableInformation}
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
class ScheduledChangesFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledChangesFetchService.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
......@@ -23,8 +26,6 @@ class ScheduledChangesFetchService {
val snsChangedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val gson = new Gson()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
......@@ -32,7 +33,7 @@ class ScheduledChangesFetchService {
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, classOf[PlannedTimetableElement])
val element: PlannedTimetableElement = parse(plannedTimetableElementItem.toJSON).extract[PlannedTimetableElement]
val evaToFetch = List(element.eva)
val timetableInformations = dbChangedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
......@@ -40,19 +41,20 @@ class ScheduledChangesFetchService {
timetableInformations.foreach { tti =>
val item = tableTimetableStops.getItem("id", tti.id)
if (item != null) {
val ttiDb = gson.fromJson(item.toJSON, classOf[TimetableInformation])
val ttiDb = TimetableInformation.fromJson(item.getJSON("tti"))
val newTti = ttiDb.updateDelay(tti)
// Update entry in dynamodb
val newItem = Item
.fromJSON(gson.toJson(newTti))
val newItem = new Item()
.withPrimaryKey("id", newTti.id)
.withJSON("tti", newTti.toJson)
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
}
}
val snsMessage = SNSNewDelayInformation(element.eva, timetableInformations.size)
snsClient.publish(snsChangedTimetableInformationTopicArn, gson.toJson(snsMessage))
snsClient.publish(snsChangedTimetableInformationTopicArn, write(snsMessage))
}
}
}
......
......@@ -4,49 +4,51 @@ import java.time.LocalDate
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, ScanFilter}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBPlannedTimetableFetcher
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.collection.mutable.ListBuffer
class ScheduledPlannedTimetableFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledPlannedTimetableFetchService.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val gson = new Gson()
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val sf: ScanFilter = new ScanFilter("lastFetched").ne(LocalDate.now.toString)
val evasToFetchItemCollection = tablePlannedTimetables.scan(
sf
)
// Download timetable
var evaToFetchList = ListBuffer.empty[PlannedTimetableElement]
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, classOf[PlannedTimetableElement])
val evaToFetch = List(element.eva.toString)
val timetableInformations = dbPlannedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { tti =>
val item = Item
.fromJSON(gson.toJson(tti))
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(item)
}
val newElement = element.copy(lastFetched = LocalDate.now.toString)
tablePlannedTimetables.putItem(Item.fromJSON(gson.toJson(newElement)))
val element: PlannedTimetableElement = parse(plannedTimetableElementItem.toJSON).extract[PlannedTimetableElement]
evaToFetchList += element
}
val json = write(evaToFetchList.map(_.eva).toList)
logger.info(s"Sending $json")
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(json)
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
}
}
......