Commit f36681ee authored by Andreas Muttscheller's avatar Andreas Muttscheller

Move to AWS lambda and SNS

parent b04a639b
......@@ -20,8 +20,8 @@ libraryDependencies ++= Seq(
// xml and json
"org.scala-lang.modules" %% "scala-xml" % "1.1.1",
"org.json" % "json" % "20180813",
"com.google.code.gson" % "gson" % "2.8.5",
"org.json4s" %% "json4s-native" % "3.6.3",
// Kafka
"org.apache.kafka" % "kafka-clients" % "2.1.0",
......
service: planned-timetable-fetcher
service: delay-info
provider:
name: aws
runtime: java8
region: us-east-1
region: eu-central-1
logRetentionInDays: 3
timeout: 300 # 5 min timeout
stage: ${opt:stage, self:custom.default_stage}
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- sns:CreateTopic
- sns:Publish
Resource: '*'
vpc:
securityGroupIds:
- ${env:AWS_SG}
subnetIds:
- ${env:AWS_SUBNET}
environment:
IS_SERVERLESS: "true"
SERVERLESS_SERVICE: ${self:service}
......@@ -36,23 +34,27 @@ custom:
functions:
ScheduledPlannedTimetableFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledPlannedTimetableFetchService::handleRequest
events:
reservedConcurrency: 1 # Only one instance may run at a time
#events:
# Monday to Friday - once at 3am
- schedule: cron(0 3 ? * MON-FRI *)
#- schedule: cron(0 3 ? * MON-FRI *)
ScheduledChangesFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledChangesFetchService::handleRequest
events:
#events:
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
- schedule: cron(0/2 6-10 ? * MON-FRI *)
- schedule: cron(0/2 15-19 ? * MON-FRI *)
# - schedule: cron(0/2 6-10 ? * MON-FRI *)
# - schedule: cron(0/2 15-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SlackChangeReporter::handleRequest
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
events:
- sns: ChangedTimetableStops
APISlackBotEventHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
events:
- http: slack
- http:
path: slack/event
method: post
resources:
Resources:
......@@ -73,7 +75,7 @@ resources:
PlannedTimetables:
Type: AWS::DynamoDB::Table
Properties:
TableName: TimetableStops
TableName: PlannedTimetables
AttributeDefinitions:
- AttributeName: eva
AttributeType: S
......@@ -87,7 +89,7 @@ resources:
SlackUsers:
Type: AWS::DynamoDB::Table
Properties:
TableName: TimetableStops
TableName: SlackUsers
AttributeDefinitions:
- AttributeName: id
AttributeType: S
......
package de.codecentric.amuttsch.bahndelayinfo.aws
object APIGatewayResponse {
private val OK = 200
private val ERROR = 500
def fromCodeAndBody(status: Int, body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(status)
response.setBody(body)
response
}
def okWithBody(body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(OK)
response.setBody(body)
response
}
def ok: APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(OK)
response
}
def errorWithBody(body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(ERROR)
response.setBody(body)
response
}
}
class APIGatewayResponse() {
private var isBase64Encoded = false
private var statusCode = 0
private var headers = new Nothing
private var body = null
def isBase64Encoded: Boolean = isBase64Encoded
def setBase64Encoded(isBase64Encoded: Boolean): Unit = {
this.isBase64Encoded = isBase64Encoded
}
def getHeaders: Nothing = headers
def setHeaders(headers: Nothing): Unit = {
this.headers = headers
}
def getBody: String = body
def setBody(body: String): Unit = {
this.body = body
}
def getStatusCode: Int = statusCode
def setStatusCode(statusCode: Int): Unit = {
this.statusCode = statusCode
}
}
package de.codecentric.amuttsch.bahndelayinfo.aws
import java.util.Date
import java.time.LocalDate
case class PlannedTimetableElement(
eva: String,
lastFetched: Date
lastFetched: String
)
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws.APIGatewayResponse
import scala.collection.JavaConverters._
import slack.api.SlackApiClient
import scala.util.{Failure, Success, Try}
import org.json4s._
import org.json4s.native.JsonMethods._
class APISlackBotEventHandler {
implicit val logger: Logger = Logger(APISlackBotEventHandler.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
val apiConf: Config = ConfigFactory.load("api")
val token: String = apiConf.getString("slack.token")
val slackApiClient: SlackApiClient = SlackApiClient(token)
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayResponse = {
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
Try(parse(event.getBody)) match {
case Success(json) =>
logger.info(json.toString)
json \ "type" match {
case JString("url_verification") =>
urlVerification(json)
case _ =>
new APIGatewayProxyResponseEvent()
.withStatusCode(500)
.withBody("Don't know what to do")
}
case Failure(e) =>
logger.info(s"Invalid json body ${event.getBody}")
new APIGatewayProxyResponseEvent()
.withStatusCode(500)
.withBody(s"Invalid json body ${event.getBody}")
}
}
APIGatewayResponse.ok
def urlVerification(json: JValue): APIGatewayProxyResponseEvent = {
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
.withBody((json \ "challenge").extract[String])
.withHeaders(
Map(
"Content-type" -> "text/plain"
).asJava
)
}
}
......
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, ScanFilter, Table}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.google.gson.Gson
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.reporter.SlackUser
import org.json.JSONObject
import slack.api.SlackApiClient
class SNSChangeSlackReporter {
......@@ -17,25 +19,36 @@ class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
val apiConf: Config = ConfigFactory.load("api")
val token = apiConf.getString("slack.token")
val slackApiClient = SlackApiClient(token)
val token: String = apiConf.getString("slack.token")
val slackApiClient: SlackApiClient = SlackApiClient(token)
val ddbClient = aws.getAwsDynamoDBClient(false)
val ddb = new DynamoDB(ddbClient)
val gson: Gson = new Gson()
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableSlackUsers: Table = ddb.getTable("SlackUsers")
val tableTimetableStops: Table = ddb.getTable("TimetableStops")
def handleRequest(event: SNSEvent, context: Context): Unit = {
val eventMessage = event.getRecords.get(0).getSNS.getMessage
logger.info(s"Received message $eventMessage")
val newDelayInformation = gson.fromJson(eventMessage, classOf[SNSNewDelayInformation])
var slackRecipients = Set.empty[SlackUser]
val tableSlackUsers = ddb.getTable("SlackUsers")
val slackUserItems = tableSlackUsers.scan()
slackUserItems.forEach { item =>
val slackUserJson = new JSONObject(item.toJSON)
slackRecipients = slackRecipients + SlackUser.fromJSON(slackUserJson)
slackRecipients = slackRecipients + gson.fromJson(item.toJSON, classOf[SlackUser])
}
event.getRecords.forEach { r=>
val message = r.getSNS.getMessage
val tti = TimetableInformation.fromJSON(new JSONObject(message))
val scanFilter: ScanFilter = new ScanFilter("eva").eq(newDelayInformation.eva)
val stops = tableTimetableStops.scan(
scanFilter
)
stops.forEach { stop =>
val tti = gson.fromJson(stop.toJSON, classOf[TimetableInformation])
if (tti.isHistory) {
return
}
......
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
......@@ -8,9 +9,8 @@ import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBChangedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.utils.JSONImplicits._
import org.json.JSONObject
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.{DBChangedTimetableFetcher, TimetableInformation}
class ScheduledChangesFetchService {
......@@ -18,10 +18,10 @@ class ScheduledChangesFetchService {
implicit val logger: Logger = Logger(ScheduledChangesFetchService.getClass)
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = aws.getAwsDynamoDBClient(context)
val dbChangedTimetableFetcher = DBChangedTimetableFetcher()
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbChangedTimetableFetcher = new DBChangedTimetableFetcher()
val snsPlannedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val snsChangedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val gson = new Gson()
......@@ -32,27 +32,27 @@ class ScheduledChangesFetchService {
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, PlannedTimetableElement.getClass)
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, classOf[PlannedTimetableElement])
val evaToFetch = List(element.eva)
val timetableInformations = dbChangedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { ttiJson =>
val item = tableTimetableStops.getItem("id", ttiJson.getString("id"))
timetableInformations.foreach { tti =>
val item = tableTimetableStops.getItem("id", tti.id)
if (item != null) {
val itemJson = new JSONObject(item.toJSON)
itemJson.deepMerge(ttiJson)
itemJson.deleteEmptyValues()
itemJson.put("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
val ttiDb = gson.fromJson(item.toJSON, classOf[TimetableInformation])
val newTti = ttiDb.updateDelay(tti)
// Update entry in dynamodb
val newItem = Item.fromJSON(itemJson.toString)
val newItem = Item
.fromJSON(gson.toJson(newTti))
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
snsClient.publish(snsPlannedTimetableInformationTopicArn, newItem.toJSON)
}
}
val snsMessage = SNSNewDelayInformation(element.eva, timetableInformations.size)
snsClient.publish(snsChangedTimetableInformationTopicArn, gson.toJson(snsMessage))
}
}
}
......
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import java.time.LocalDate
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, ScanFilter}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
......@@ -15,32 +17,35 @@ class ScheduledPlannedTimetableFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledPlannedTimetableFetchService.getClass)
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = aws.getAwsDynamoDBClient(context)
val dbPlannedTimetableFetcher = DBPlannedTimetableFetcher()
val snsPlannedTimetableInformationTopicArn = snsClient.createTopic("PlannedTimetableStops").getTopicArn
val ddbClient = AmazonDynamoDBClientBuilder.standard.build
val dbPlannedTimetableFetcher = new DBPlannedTimetableFetcher()
val gson = new Gson()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val evasToFetchItemCollection = tablePlannedTimetables.scan()
val sf: ScanFilter = new ScanFilter("lastFetched").ne(LocalDate.now.toString)
val evasToFetchItemCollection = tablePlannedTimetables.scan(
sf
)
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, PlannedTimetableElement.getClass)
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, classOf[PlannedTimetableElement])
val evaToFetch = List(element.eva.toString)
val timetableInformations = dbPlannedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { ttiJson =>
val item = Item.fromJSON(ttiJson.toString)
timetableInformations.foreach { tti =>
val item = Item
.fromJSON(gson.toJson(tti))
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(item)
snsClient.publish(snsPlannedTimetableInformationTopicArn, ttiJson.toString)
}
val newElement = element.copy(lastFetched = LocalDate.now.toString)
tablePlannedTimetables.putItem(Item.fromJSON(gson.toJson(newElement)))
}
}
}
......
package de.codecentric.amuttsch.bahndelayinfo
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.typesafe.scalalogging.Logger
package object aws {
def getAwsDynamoDBClient(context: Context)(implicit logger: Logger): AmazonDynamoDB = {
getAwsDynamoDBClient(context.getAwsRequestId == "DummyContext")
}
def getAwsDynamoDBClient(local: Boolean)(implicit logger: Logger): AmazonDynamoDB = {
val builder = AmazonDynamoDBClientBuilder.standard
// Set the endpoint to localhost if we executed the function locally
if (local) {
logger.info(s"Using local dynamoDB")
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", Regions.EU_CENTRAL_1.getName)
)
}
builder.build
}
def getLambdaFunctionName(functionName: String): String = {
val service = scala.util.Properties.envOrElse("SERVERLESS_SERVICE", "")
val stage = scala.util.Properties.envOrElse("SERVERLESS_STAGE", "")
......
package de.codecentric.amuttsch.bahndelayinfo.aws.sns
sealed trait SNSMessage
case class SNSNewDelayInformation(
eva: String,
amount: Int
) extends SNSMessage
package de.codecentric.amuttsch.bahndelayinfo.fetcher
import java.util.Properties
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.softwaremill.sttp._
import com.typesafe.config.Config
import de.codecentric.amuttsch.bahndelayinfo.utils.JSONImplicits._
import de.codecentric.amuttsch.bahndelayinfo.{KafkaTopics, aws}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json.JSONObject
class DBChangedTimetableFetcher extends DBFetcher {
......@@ -18,7 +9,7 @@ class DBChangedTimetableFetcher extends DBFetcher {
uri"${apiConf.getString("dbapi.url")}/timetables/v1/fchg/${ttr.eva}"
}
override def fetchTimetablesForEvas(evaToFetch: List[String]): Seq[JSONObject] = {
override def fetchTimetablesForEvas(evaToFetch: List[String]): Seq[TimetableInformation] = {
val timetableRequests = evaToFetch.map { eva =>
TimetableRequest(eva)
}
......@@ -26,65 +17,3 @@ class DBChangedTimetableFetcher extends DBFetcher {
processTimetableRequests(timetableRequests)
}
}
/**
* Create topic:
* kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_changes_json --partitions 3 --replication-factor 1
*/
object DBChangedTimetableFetcher extends App {
def apply(): DBChangedTimetableFetcher = new DBChangedTimetableFetcher()
val props = {
val p = new Properties
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, scala.util.Properties.envOrElse("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092"))
p.put(ProducerConfig.CLIENT_ID_CONFIG, "changed-timetable-fetcher-fetcher")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
p
}
val kafkaProducer = new KafkaProducer[String, String](props)
val ddbClient = aws.getAwsDynamoDBClient(local = true)
val ddb = new DynamoDB(ddbClient)
val tableTimetableStops = ddb.getTable("TimetableStops")
def saveStopChange(station: String, s: JSONObject): Unit = {
val item = tableTimetableStops.getItem("id", s.getString("id"))
if (item == null) return
val itemJson = new JSONObject(item.toJSON)
itemJson.deepMerge(s)
itemJson.deleteEmptyValues()
itemJson.put("station", station)
itemJson.put("ttl", System.currentTimeMillis() / 1000 + 24*60*60)
// Update entry in dynamodb
val newItem = Item.fromJSON(itemJson.toString)
tableTimetableStops.putItem(newItem)
// Publish complete stop information in kafka
kafkaProducer.send(new ProducerRecord[String, String](
KafkaTopics.TOPIC_TIMETABLE_CHANGES_JSON,
s.getString("id"),
itemJson.toString
))
}
kafkaProducer.close()
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
try {
DBChangedTimetableFetcher().fetchTimetablesForEvas(List("8001377", "8000105"))
} catch {
case e: Exception => e.printStackTrace()
}
}
}
val f = ex.scheduleAtFixedRate(task, 1, 60, TimeUnit.SECONDS)
ex.awaitTermination(10, TimeUnit.MINUTES)
}
\ No newline at end of file
......@@ -3,11 +3,9 @@ package de.codecentric.amuttsch.bahndelayinfo.fetcher
import com.softwaremill.sttp.{HttpURLConnectionBackend, Id, SttpBackend, sttp, _}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.utils.JSONImplicits._
import org.json.{JSONObject, XML}
import org.xml.sax.SAXParseException
import scala.collection.JavaConverters._
import scala.xml.XML
private[fetcher] case class TimetableRequest(eva: String, date: String = "", hour: String = "")
......@@ -17,15 +15,15 @@ private[fetcher] trait DBFetcher {
private implicit val backend: SttpBackend[Id, Nothing] = HttpURLConnectionBackend()
implicit val apiConf: Config = ConfigFactory.load("api")
implicit val logger: Logger = Logger(DBPlannedTimetableFetcher.getClass)
implicit val logger: Logger = Logger(this.getClass)
def fetchUrl(ttr: TimetableRequest)(implicit apiConf: Config): Uri
def fetchTimetablesForEvas(evas: List[String]): Seq[JSONObject]
def fetchTimetablesForEvas(evas: List[String]): Seq[TimetableInformation]
private[fetcher] def processTimetableRequests(
timetableRequests: List[TimetableRequest]
): Seq[JSONObject] = {
): Seq[TimetableInformation] = {
val rateLimitSleepMs = 60 / apiConf.getInt("dbapi.timetable.ratelimit_per_minute") * 1000
timetableRequests
......@@ -35,15 +33,10 @@ private[fetcher] trait DBFetcher {
fetchChanges(ttr)
}
.filter(_.isValid)
.map(ttr => XML.toJSONObject(ttr.responseBody))
.filter(ttr => ttr.has("timetable") && ttr.getJSONObject