Commit c1d1ee26 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Add importer

parent 2cee9878
......@@ -58,6 +58,7 @@ functions:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandler::handleRequest
iamRoleStatementsInherit: true
events:
- schedule: cron(0/2 6-18 ? * MON-FRI *)
- http:
path: slack/event
method: post
......@@ -66,6 +67,20 @@ functions:
resources:
Resources:
Stations:
Type: AWS::DynamoDB::Table
Properties:
TableName: Stations
AttributeDefinitions:
- AttributeName: eva
AttributeType: S
KeySchema:
- AttributeName: eva
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
TimeTableStopTable:
Type: AWS::DynamoDB::Table
Properties:
......
#!/bin/bash
ENDPOINT=--endpoint-url http://localhost:8000
aws dynamodb create-table --table-name TimetableStops --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
aws dynamodb create-table --table-name SlackUsers --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
aws dynamodb create-table --table-name PlannedTimetables --attribute-definitions AttributeName=eva,AttributeType=N --key-schema AttributeName=eva,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 $ENDPOINT
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_json --partitions 3 --replication-factor 1
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic timetable_changes_json --partitions 3 --replication-factor 1
This diff is collapsed.
package de.codecentric.amuttsch.bahndelayinfo.aws
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.prefs.EmptyValueStrategy
import scala.io.Source
object StationImporter extends App {
implicit val jsonFormat: Formats = DefaultFormats.withEmptyValueStrategy(new EmptyValueStrategy {
def noneValReplacement = None
def replaceEmpty(value: JValue): JValue = value match {
case JString("") => JNothing
case JArray(items) => JArray(items map replaceEmpty)
case JObject(fields) => JObject(fields map {
case JField(name, v) => JField(name, replaceEmpty(v))
})
case oth => oth
}
})
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val jsonResource = Source.fromResource("dbStations.json")
val jsonString = jsonResource.getLines().mkString("")
val json = parse(jsonString)
for {
JArray(objList) <- json
obj <- objList
} {
val jsonObj = compact(render(obj))
println(jsonObj)
tableStations.putItem(
Item
.fromJSON(jsonObj)
.withPrimaryKey("eva", (obj \ "EVA_NR").extract[String])
)
}
}
......@@ -3,10 +3,13 @@ package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.{Message, SlackUser, UrlVerification}
import org.json4s._
import org.json4s.native.JsonMethods._
......@@ -72,30 +75,34 @@ class APISlackBotEventHandlerWorker {
def parseMessage(message: Message): APIGatewayProxyResponseEvent = {
message.text.split("\"").map(_.trim).toList match {
case "register" :: params =>
if (params.size != 3) {
slackApiClient.postChatMessage(message.channel, "Wrong format. Use \"register <train regex> <station regex>")
} else {
try {
// Create regex to ensure proper syntax
val trainRegex = params.head.r
val stationRegex = params(2).r
val newSlackUser = SlackUser(
message.user,
message.channel,
trainRegex.toString(),
stationRegex.toString()
)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user)
tableSlackUsers.putItem(item)
slackApiClient.postChatMessage(message.channel, "Registered!", asUser = Some(true))
} catch {
case e: Exception =>
e.printStackTrace()
slackApiClient.postChatMessage(message.channel, "Invalid regex", asUser = Some(true))
}
case "addStation" :: eva :: Nil =>
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(write(List(eva)))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
slackApiClient.postChatMessage(message.channel, s"Added station $eva!", asUser = Some(true))
case "register" :: train :: _ :: station :: Nil =>
try {
// Create regex to ensure proper syntax
val trainRegex = train.r
val stationRegex = station.r
val newSlackUser = SlackUser(
message.user,
message.channel,
trainRegex.toString(),
stationRegex.toString()
)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user)
tableSlackUsers.putItem(item)
slackApiClient.postChatMessage(message.channel, "Registered!", asUser = Some(true))
} catch {
case e: Exception =>
e.printStackTrace()
slackApiClient.postChatMessage(message.channel, "Invalid regex", asUser = Some(true))
}
case "unregister" :: _ =>
tableSlackUsers.deleteItem("id", message.user)
......
......@@ -56,7 +56,7 @@ class SNSChangeSlackReporter {
.map(item => TimetableInformation.fromJson(item.getJSON("tti")))
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 0)
.filter(_.delayInMinutes > 2)
.foreach { tti =>
slackRecipients foreach { sr =>
if (tti.station.matches(sr.stationRegex) &&
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment