Commit b04a639b authored by Andreas Muttscheller's avatar Andreas Muttscheller

Refactoring

parent eaad0ab3
......@@ -28,10 +28,12 @@ libraryDependencies ++= Seq(
// AWS
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.475",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.479",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.479",
"com.amazonaws" % "aws-lambda-java-events" % "2.2.4",
"com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
// Slack
// Slack
"com.github.slack-scala-client" %% "slack-scala-client" % "0.2.5",
)
libraryDependencies += {
......
......@@ -5,16 +5,23 @@ provider:
runtime: java8
region: us-east-1
timeout: 300 # 5 min timeout
stage: ${opt:stage, self:custom.default_stage}
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:PutItem
- sns:CreateTopic
- sns:Publish
Resource: '*'
vpc:
securityGroupIds:
- ${env:AWS_SG}
subnetIds:
- ${env:AWS_SUBNET}
environment:
IS_SERVERLESS: "true"
SERVERLESS_SERVICE: ${self:service}
SERVERLESS_STAGE: ${self:provider.stage}
# you can add packaging information here
......@@ -24,27 +31,28 @@ package:
artifact: target/scala-2.12/planned-timetable-fetcher.jar
custom:
kafkaBootstrapServers: 10.0.0.10:9092,10.0.2.122:9092,10.0.1.58:9092
default_stage: dev
functions:
TimeTableFetcher:
handler: de.codecentric.amuttsch.plannedtimetable.fetcher.TimetableFetcher::handleRequest
ScheduledPlannedTimetableFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledPlannedTimetableFetchService::handleRequest
events:
- schedule: cron(0 1 * * ? *)
environment:
KAFKA_BOOTSTRAP_SERVERS: ${self:custom.kafkaBootstrapServers}
TimetableRawToSeparatedStopsJson:
handler: de.codecentric.amuttsch.plannedtimetable.transform.TimetableRawToSeparatedStopsJson::handleRequest
# Monday to Friday - once at 3am
- schedule: cron(0 3 ? * MON-FRI *)
ScheduledChangesFetchService:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.ScheduledChangesFetchService::handleRequest
events:
- schedule: rate(10 minutes)
environment:
KAFKA_BOOTSTRAP_SERVERS: ${self:custom.kafkaBootstrapServers}
PersistTimetableStops:
handler: de.codecentric.amuttsch.plannedtimetable.persist.PersistTimetableStops::handleRequest
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
- schedule: cron(0/2 6-10 ? * MON-FRI *)
- schedule: cron(0/2 15-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SlackChangeReporter::handleRequest
events:
- schedule: rate(10 minutes)
environment:
KAFKA_BOOTSTRAP_SERVERS: ${self:custom.kafkaBootstrapServers}
- sns: ChangedTimetableStops
APISlackBotEventHandler:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
events:
- http: slack
resources:
Resources:
......@@ -58,6 +66,23 @@ resources:
KeySchema:
- AttributeName: id
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
PlannedTimetables:
Type: AWS::DynamoDB::Table
Properties:
TableName: TimetableStops
AttributeDefinitions:
- AttributeName: eva
AttributeType: S
KeySchema:
- AttributeName: eva
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
SlackUsers:
Type: AWS::DynamoDB::Table
......@@ -69,4 +94,7 @@ resources:
KeySchema:
- AttributeName: id
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
#!/bin/bash
ENDPOINT=--endpoint-url http://localhost:8000
aws dynamodb create-table --table-name TimetableStops --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
aws dynamodb create-table --table-name SlackUsers --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
aws dynamodb create-table --table-name PlannedTimetables --attribute-definitions AttributeName=eva,AttributeType=N --key-schema AttributeName=eva,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_json --partitions 3 --replication-factor 1
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_changes_json --partitions 3 --replication-factor 1
package de.codecentric.amuttsch.plannedtimetable
package de.codecentric.amuttsch.bahndelayinfo
object KafkaTopics {
val TOPIC_TIMETABLE_RAW = "timetable_raw"
val TOPIC_TIMETABLE_JSON = "timetable_json"
val TOPIC_TIMETABLE_CHANGES_JSON = "timetable_changes_json"
}
package de.codecentric.amuttsch.bahndelayinfo
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import slack.api.SlackApiClient
import slack.rtm.SlackRtmClient
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//object SlackBot extends App {
// val apiConf = ConfigFactory.load("api")
//
// implicit val system = ActorSystem("slack")
// val token = apiConf.getString("slack.token")
// val slackRtmClient = SlackRtmClient(token)
// val slackApiClient = SlackApiClient(token)
//
// slackRtmClient.onMessage { message =>
// println(s"User: ${message.user}, Message: ${message.text} - ${message.channel} - ${message.getClass.getSimpleName}")
// message.text.split("\"") match {
// case "register" :: params =>
// if (params.size != 3) {
// slackRtmClient.sendMessage(message.channel, "Wrong format. Use \"register <train regex> <station regex>")
// } else {
// try {
// val trainRegex = params.head.asInstanceOf[String].r
// val stationRegex = params(2).asInstanceOf[String].r
// ccr.registerSlackUser(message.channel, trainRegex, stationRegex)
// slackRtmClient.sendMessage(message.channel, "Registered!")
// } catch {
// case e: Exception =>
// e.printStackTrace()
// slackRtmClient.sendMessage(message.channel, "Invalid regex")
// }
// }
// case "unregister" =>
// ccr.unregisterSlackUser(message.channel)
// slackRtmClient.sendMessage(message.channel, "Unregistered!")
// case _ =>
// }
// }
//
// val result = Await.ready(f, Duration.Inf).value.get
//
// result match {
// case scala.util.Success(_) => println("finished successfully")
// case scala.util.Failure(exception) => throw exception
// }
//}
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws
object APIGatewayResponse {
private val OK = 200
private val ERROR = 500
def fromCodeAndBody(status: Int, body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(status)
response.setBody(body)
response
}
def okWithBody(body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(OK)
response.setBody(body)
response
}
def ok: APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(OK)
response
}
def errorWithBody(body: String): APIGatewayResponse = {
val response = new APIGatewayResponse
response.setStatusCode(ERROR)
response.setBody(body)
response
}
}
class APIGatewayResponse() {
private var isBase64Encoded = false
private var statusCode = 0
private var headers = new Nothing
private var body = null
def isBase64Encoded: Boolean = isBase64Encoded
def setBase64Encoded(isBase64Encoded: Boolean): Unit = {
this.isBase64Encoded = isBase64Encoded
}
def getHeaders: Nothing = headers
def setHeaders(headers: Nothing): Unit = {
this.headers = headers
}
def getBody: String = body
def setBody(body: String): Unit = {
this.body = body
}
def getStatusCode: Int = statusCode
def setStatusCode(statusCode: Int): Unit = {
this.statusCode = statusCode
}
}
package de.codecentric.amuttsch.plannedtimetable.aws
package de.codecentric.amuttsch.bahndelayinfo.aws
import com.amazonaws.services.lambda.runtime.{ClientContext, CognitoIdentity, Context, LambdaLogger}
......
package de.codecentric.amuttsch.bahndelayinfo.aws
import java.util.Date
case class PlannedTimetableElement(
eva: String,
lastFetched: Date
)
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws.APIGatewayResponse
class APISlackBotEventHandler {
implicit val logger: Logger = Logger(APISlackBotEventHandler.getClass)
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayResponse = {
APIGatewayResponse.ok
}
}
object APISlackBotEventHandler
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.reporter.SlackUser
import org.json.JSONObject
import slack.api.SlackApiClient
class SNSChangeSlackReporter {
implicit val system: ActorSystem = ActorSystem("slack")
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
val apiConf: Config = ConfigFactory.load("api")
val token = apiConf.getString("slack.token")
val slackApiClient = SlackApiClient(token)
val ddbClient = aws.getAwsDynamoDBClient(false)
val ddb = new DynamoDB(ddbClient)
def handleRequest(event: SNSEvent, context: Context): Unit = {
var slackRecipients = Set.empty[SlackUser]
val tableSlackUsers = ddb.getTable("SlackUsers")
val slackUserItems = tableSlackUsers.scan()
slackUserItems.forEach { item =>
val slackUserJson = new JSONObject(item.toJSON)
slackRecipients = slackRecipients + SlackUser.fromJSON(slackUserJson)
}
event.getRecords.forEach { r=>
val message = r.getSNS.getMessage
val tti = TimetableInformation.fromJSON(new JSONObject(message))
if (tti.isHistory) {
return
}
slackRecipients foreach { sr =>
if (tti.station.matches(sr.stationRegex) &&
tti.train.matches(sr.trainRegex)
) {
if (!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
) {
sr.seenTimetableInformation.update(tti.id, tti)
sendDelayInformationToUser(sr, tti)
}
}
}
}
}
def sendDelayInformationToUser(user: SlackUser, tti: TimetableInformation): Unit = {
logger.info(s"Sending messages to user for station ${tti.station}")
slackApiClient.postChatMessage(
user.user,
tti.toString
)
}
}
object SNSChangeSlackReporter
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBChangedTimetableFetcher
import de.codecentric.amuttsch.bahndelayinfo.utils.JSONImplicits._
import org.json.JSONObject
class ScheduledChangesFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledChangesFetchService.getClass)
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = aws.getAwsDynamoDBClient(context)
val dbChangedTimetableFetcher = DBChangedTimetableFetcher()
val snsPlannedTimetableInformationTopicArn = snsClient.createTopic("ChangedTimetableStops").getTopicArn
val gson = new Gson()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val evasToFetchItemCollection = tablePlannedTimetables.scan()
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, PlannedTimetableElement.getClass)
val evaToFetch = List(element.eva)
val timetableInformations = dbChangedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { ttiJson =>
val item = tableTimetableStops.getItem("id", ttiJson.getString("id"))
if (item != null) {
val itemJson = new JSONObject(item.toJSON)
itemJson.deepMerge(ttiJson)
itemJson.deleteEmptyValues()
itemJson.put("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
// Update entry in dynamodb
val newItem = Item.fromJSON(itemJson.toString)
tableTimetableStops.putItem(newItem)
snsClient.publish(snsPlannedTimetableInformationTopicArn, newItem.toJSON)
}
}
}
}
}
object ScheduledChangesFetchService
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.google.gson.Gson
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.PlannedTimetableElement
import de.codecentric.amuttsch.bahndelayinfo.fetcher.DBPlannedTimetableFetcher
class ScheduledPlannedTimetableFetchService {
def handleRequest(event: ScheduledEvent, context: Context): Unit = {
implicit val logger: Logger = Logger(ScheduledPlannedTimetableFetchService.getClass)
val snsClient = AmazonSNSClientBuilder.defaultClient()
val ddbClient = aws.getAwsDynamoDBClient(context)
val dbPlannedTimetableFetcher = DBPlannedTimetableFetcher()
val snsPlannedTimetableInformationTopicArn = snsClient.createTopic("PlannedTimetableStops").getTopicArn
val gson = new Gson()
val ddb = new DynamoDB(ddbClient)
val tablePlannedTimetables = ddb.getTable("PlannedTimetables")
val tableTimetableStops = ddb.getTable("TimetableStops")
val evasToFetchItemCollection = tablePlannedTimetables.scan()
// Download timetable
evasToFetchItemCollection.forEach { plannedTimetableElementItem =>
val element: PlannedTimetableElement = gson.fromJson(plannedTimetableElementItem.toJSON, PlannedTimetableElement.getClass)
val evaToFetch = List(element.eva.toString)
val timetableInformations = dbPlannedTimetableFetcher.fetchTimetablesForEvas(evaToFetch)
timetableInformations.foreach { ttiJson =>
val item = Item.fromJSON(ttiJson.toString)
tableTimetableStops.putItem(item)
snsClient.publish(snsPlannedTimetableInformationTopicArn, ttiJson.toString)
}
}
}
}
object ScheduledPlannedTimetableFetchService
package de.codecentric.amuttsch.bahndelayinfo
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.typesafe.scalalogging.Logger
package object aws {
def getAwsDynamoDBClient(context: Context)(implicit logger: Logger): AmazonDynamoDB = {
getAwsDynamoDBClient(context.getAwsRequestId == "DummyContext")
}
def getAwsDynamoDBClient(local: Boolean)(implicit logger: Logger): AmazonDynamoDB = {
val builder = AmazonDynamoDBClientBuilder.standard
// Set the endpoint to localhost if we executed the function locally
if (local) {
logger.info(s"Using local dynamoDB")
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", Regions.EU_CENTRAL_1.getName)
)
}
builder.build
}
def getLambdaFunctionName(functionName: String): String = {
val service = scala.util.Properties.envOrElse("SERVERLESS_SERVICE", "")
val stage = scala.util.Properties.envOrElse("SERVERLESS_STAGE", "")
if (service.nonEmpty && stage.nonEmpty) {
s"$service-$stage-$functionName"
} else {
throw new RuntimeException(s"Environment variables SERVERLESS_SERVICE ($service) or SERVERLESS_STAGE ($stage) missing!")
}
}
}
package de.codecentric.amuttsch.bahndelayinfo.fetcher
import java.util.Properties
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.softwaremill.sttp._
import com.typesafe.config.Config
import de.codecentric.amuttsch.bahndelayinfo.utils.JSONImplicits._
import de.codecentric.amuttsch.bahndelayinfo.{KafkaTopics, aws}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json.JSONObject
class DBChangedTimetableFetcher extends DBFetcher {
override def fetchUrl(ttr: TimetableRequest)(implicit apiConf: Config): Uri = {
uri"${apiConf.getString("dbapi.url")}/timetables/v1/fchg/${ttr.eva}"
}
override def fetchTimetablesForEvas(evaToFetch: List[String]): Seq[JSONObject] = {
val timetableRequests = evaToFetch.map { eva =>
TimetableRequest(eva)
}
processTimetableRequests(timetableRequests)
}
}
/**
* Create topic:
* kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_changes_json --partitions 3 --replication-factor 1
*/
object DBChangedTimetableFetcher extends App {
def apply(): DBChangedTimetableFetcher = new DBChangedTimetableFetcher()
val props = {
val p = new Properties
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, scala.util.Properties.envOrElse("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092"))
p.put(ProducerConfig.CLIENT_ID_CONFIG, "changed-timetable-fetcher-fetcher")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
p
}
val kafkaProducer = new KafkaProducer[String, String](props)
val ddbClient = aws.getAwsDynamoDBClient(local = true)
val ddb = new DynamoDB(ddbClient)
val tableTimetableStops = ddb.getTable("TimetableStops")
def saveStopChange(station: String, s: JSONObject): Unit = {
val item = tableTimetableStops.getItem("id", s.getString("id"))
if (item == null) return
val itemJson = new JSONObject(item.toJSON)
itemJson.deepMerge(s)
itemJson.deleteEmptyValues()
itemJson.put("station", station)
itemJson.put("ttl", System.currentTimeMillis() / 1000 + 24*60*60)
// Update entry in dynamodb
val newItem = Item.fromJSON(itemJson.toString)
tableTimetableStops.putItem(newItem)
// Publish complete stop information in kafka
kafkaProducer.send(new ProducerRecord[String, String](
KafkaTopics.TOPIC_TIMETABLE_CHANGES_JSON,
s.getString("id"),
itemJson.toString
))
}
kafkaProducer.close()
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
try {
DBChangedTimetableFetcher().fetchTimetablesForEvas(List("8001377", "8000105"))
} catch {
case e: Exception => e.printStackTrace()
}
}
}
val f = ex.scheduleAtFixedRate(task, 1, 60, TimeUnit.SECONDS)
ex.awaitTermination(10, TimeUnit.MINUTES)
}
\ No newline at end of file