Commit 1442eb42 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Add ElasticSearch

parent c1d1ee26
......@@ -32,8 +32,13 @@ libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-events" % "2.2.4",
"com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
// Slack
// Slack
"com.github.slack-scala-client" %% "slack-scala-client" % "0.2.5",
// ElasicSearch
"com.sksamuel.elastic4s" % "elastic4s-core_2.12" % "6.3.8",
"com.sksamuel.elastic4s" %% "elastic4s-http" % "6.3.8",
"com.sksamuel.elastic4s" %% "elastic4s-aws" % "6.3.8",
)
libraryDependencies += {
sys.props += "packaging.type" -> "jar"
......
......@@ -16,11 +16,23 @@ provider:
- dynamodb:PutItem
- sns:CreateTopic
- sns:Publish
- es:ESHttpPost
- es:ESHttpPut
- es:ESHttpDelete
- es:ESHttpGet
Resource: '*'
environment:
IS_SERVERLESS: "true"
SERVERLESS_SERVICE: ${self:service}
SERVERLESS_STAGE: ${self:provider.stage}
ELASTICSEARCH_URL:
Fn::GetAtt: ["DelayInformationSearch", "DomainEndpoint"]
APIGATEWAY_URL:
Fn::Join:
- ""
- - "https://"
- Ref: "ApiGatewayRestApi"
- ".execute-api.${self:provider.region}.amazonaws.com/${self:provider.stage}"
# you can add packaging information here
......@@ -64,6 +76,13 @@ functions:
method: post
APISlackBotEventHandlerWorker:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APISlackBotEventHandlerWorker::handleRequest
APIStationQuery:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.APIStationQuery::handleRequest
iamRoleStatementsInherit: true
events:
- http:
path: stations
method: get
resources:
Resources:
......@@ -73,14 +92,20 @@ resources:
TableName: Stations
AttributeDefinitions:
- AttributeName: eva
AttributeType: N
- AttributeName: name
AttributeType: S
KeySchema:
- AttributeName: eva
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
GlobalSecondaryIndexes:
- IndexName: NameIndex
KeySchema:
- AttributeName: name
KeyType: HASH
Projection:
ProjectionType: ALL
TimeTableStopTable:
Type: AWS::DynamoDB::Table
Properties:
......@@ -123,3 +148,28 @@ resources:
AttributeName: ttl
Enabled: true
BillingMode: PAY_PER_REQUEST
DelayInformationSearch:
Type: "AWS::Elasticsearch::Domain"
Properties:
ElasticsearchVersion: "6.3"
DomainName: "bahn-delay-information"
ElasticsearchClusterConfig:
DedicatedMasterEnabled: false
InstanceCount: "1"
ZoneAwarenessEnabled: false
InstanceType: "t2.small.elasticsearch"
EBSOptions:
EBSEnabled: true
Iops: 0
VolumeSize: 10
VolumeType: "gp2"
AccessPolicies:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
AWS: "*"
Action: "es:*"
Resource: "*"
AdvancedOptions:
rest.action.multi.allow_explicit_index: "true"
\ No newline at end of file
This diff is collapsed.
......@@ -2,45 +2,58 @@ package de.codecentric.amuttsch.bahndelayinfo.aws
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import de.codecentric.amuttsch.bahndelayinfo.aws
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.prefs.EmptyValueStrategy
import org.json4s.native.Serialization.write
import scala.io.Source
case class Station(
eva: Int,
ds100: String,
name: String,
latitude: String,
longitude: String,
traffic: String,
ifopt: String
)
object StationImporter extends App {
implicit val jsonFormat: Formats = DefaultFormats.withEmptyValueStrategy(new EmptyValueStrategy {
def noneValReplacement = None
def replaceEmpty(value: JValue): JValue = value match {
case JString("") => JNothing
case JArray(items) => JArray(items map replaceEmpty)
case JObject(fields) => JObject(fields map {
case JField(name, v) => JField(name, replaceEmpty(v))
})
case oth => oth
}
})
import com.sksamuel.elastic4s.http.ElasticDsl._
private val customSerializer = new CustomSerializer[String](_ => (
{ case JString(s) => s },
{ case "" => JNothing case s: String => JString(s) }
))
private implicit val jsonFormats: Formats = DefaultFormats + customSerializer
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
val esClient = aws.elasticSearchClient()
esClient.execute {
createIndex("stations")
}.await
val jsonResource = Source.fromResource("dbStations.json")
val jsonString = jsonResource.getLines().mkString("")
val json = parse(jsonString)
val json = parse(jsonString).extract[List[Station]]
for {
JArray(objList) <- json
obj <- objList
} {
val jsonObj = compact(render(obj))
json foreach { obj =>
val jsonObj = write(obj)
println(jsonObj)
esClient.execute {
indexInto("stations", "station").doc(jsonObj)
}.await
tableStations.putItem(
Item
.fromJSON(jsonObj)
.withPrimaryKey("eva", (obj \ "EVA_NR").extract[String])
)
}
println("Done")
esClient.close()
}
......@@ -6,11 +6,13 @@ import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBu
import com.amazonaws.services.lambda.AWSLambdaClientBuilder
import com.amazonaws.services.lambda.model.{InvocationType, InvokeRequest}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent
import com.softwaremill.sttp._
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.{Message, SlackUser, UrlVerification}
import de.codecentric.amuttsch.bahndelayinfo.aws.Station
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.{DelayRegistration, Message, SlackUser, UrlVerification}
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
......@@ -23,6 +25,7 @@ class APISlackBotEventHandlerWorker {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
implicit val system: ActorSystem = ActorSystem("slack")
implicit val backend: SttpBackend[Id, Nothing] = HttpURLConnectionBackend()
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
......@@ -75,38 +78,14 @@ class APISlackBotEventHandlerWorker {
def parseMessage(message: Message): APIGatewayProxyResponseEvent = {
message.text.split("\"").map(_.trim).toList match {
case "addStation" :: eva :: Nil =>
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(write(List(eva)))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
slackApiClient.postChatMessage(message.channel, s"Added station $eva!", asUser = Some(true))
case "searchStation" :: station :: Nil =>
searchStation(message, station)
case "addStation" :: station :: Nil =>
addStation(message, station)
case "register" :: train :: _ :: station :: Nil =>
try {
// Create regex to ensure proper syntax
val trainRegex = train.r
val stationRegex = station.r
val newSlackUser = SlackUser(
message.user,
message.channel,
trainRegex.toString(),
stationRegex.toString()
)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user)
tableSlackUsers.putItem(item)
slackApiClient.postChatMessage(message.channel, "Registered!", asUser = Some(true))
} catch {
case e: Exception =>
e.printStackTrace()
slackApiClient.postChatMessage(message.channel, "Invalid regex", asUser = Some(true))
}
registerUser(message, train, station)
case "unregister" :: _ =>
tableSlackUsers.deleteItem("id", message.user)
slackApiClient.postChatMessage(message.channel, "Unregistered!", asUser = Some(true))
unregisterUser(message)
case tail =>
slackApiClient.postChatMessage(message.channel, s"Unknown command: $tail", asUser = Some(true))
}
......@@ -114,5 +93,76 @@ class APISlackBotEventHandlerWorker {
new APIGatewayProxyResponseEvent()
.withStatusCode(200)
}
private def searchStation(message: Message, station: String): Unit = {
val stations = getListOfStations(station).map(_.name)
slackApiClient.postChatMessage(message.channel, stations.mkString("\n"), asUser = Some(true))
}
private def getListOfStations(station: String): List[Station] = {
val apiGatewayUrl = scala.util.Properties.envOrElse("APIGATEWAY_URL", "")
val response = sttp
.get(uri"$apiGatewayUrl/stations/?q=$station")
.send()
response.body match {
case Left(errorMessage) =>
logger.error(errorMessage)
List.empty[Station]
case Right(body) =>
parse(body).extract[List[Station]]
}
}
private def registerUser(message: Message, train: String, station: String): Unit = {
try {
val user = getOrCreateUser(message)
// Create regex to ensure proper syntax
// TODO Lookup stations and get eva
val delayRegistration: DelayRegistration = DelayRegistration(train, station)
val newSlackUser = user.copy(
delayRegistration = user.delayRegistration + delayRegistration
)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user)
tableSlackUsers.putItem(item)
slackApiClient.postChatMessage(message.channel, "Registered!", asUser = Some(true))
} catch {
case e: Exception =>
e.printStackTrace()
slackApiClient.postChatMessage(message.channel, "Invalid regex", asUser = Some(true))
}
}
private def unregisterUser(message: Message): Unit = {
tableSlackUsers.deleteItem("id", message.user)
slackApiClient.postChatMessage(message.channel, "Unregistered!", asUser = Some(true))
}
private def addStation(message: Message, eva: String): Unit = {
val lambdaClient = AWSLambdaClientBuilder.defaultClient
val rq = new InvokeRequest()
.withFunctionName(aws.getLambdaFunctionName("ScheduledPlannedTimetableFetchWorker"))
.withPayload(write(List(eva)))
rq.setInvocationType(InvocationType.Event)
lambdaClient.invoke(rq)
slackApiClient.postChatMessage(message.channel, s"Added station $eva!", asUser = Some(true))
}
def getOrCreateUser(message: Message): SlackUser = {
tableSlackUsers.getItem("id", message.user) match {
case item: Item =>
parse(item.toJSON()).extract[SlackUser]
case null =>
val newSlackUser = SlackUser(message.user, message.channel)
tableSlackUsers.putItem(Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", message.user))
newSlackUser
}
}
}
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.{APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent}
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.http.{RequestFailure, RequestSuccess}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.collection.JavaConverters._
class APIStationQuery {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
implicit val system: ActorSystem = ActorSystem("slack")
import com.sksamuel.elastic4s.http.ElasticDsl._
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
val ddb: DynamoDB = new DynamoDB(ddbClient)
val tableStations: Table = ddb.getTable("Stations")
def handleRequest(event: APIGatewayProxyRequestEvent, context: Context): APIGatewayProxyResponseEvent = {
val queryParams = event.getQueryStringParameters
if (queryParams.containsKey("eva")) {
try {
val eva = queryParams.get("eva").toInt
logger.info(s"Getting eva $eva")
val stationJson = tableStations.getItem("eva", eva).toJSON
new APIGatewayProxyResponseEvent()
.withBody(stationJson)
.withStatusCode(200)
} catch {
case _: NumberFormatException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Invalid eva format ${queryParams.get("eva")}")
.withStatusCode(400)
case _: NullPointerException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva ${queryParams.get("eva")} not found")
.withStatusCode(400)
case e: Exception =>
logger.error("Invalid parameter", e)
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
}
} else if (queryParams.containsKey("q")) {
try {
val esClient = aws.elasticSearchClient()
val resp = esClient.execute {
search("stations") matchQuery("name", queryParams.get("q"))
}.await
esClient.close()
resp match {
case failure: RequestFailure =>
logger.error(failure.error.toString)
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
case results: RequestSuccess[SearchResponse] =>
new APIGatewayProxyResponseEvent()
.withBody(write(results.result.hits.hits.map(r => parse(r.sourceAsString))))
.withStatusCode(200)
}
} catch {
case _: NumberFormatException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Invalid eva format ${queryParams.get("eva")}")
.withStatusCode(400)
case _: NullPointerException =>
new APIGatewayProxyResponseEvent()
.withBody(s"Eva ${queryParams.get("eva")} not found")
.withStatusCode(400)
case e: Exception =>
logger.error("Invalid parameter", e)
new APIGatewayProxyResponseEvent()
.withBody(s"Error")
.withStatusCode(400)
}
} else {
new APIGatewayProxyResponseEvent()
.withBody("Not found")
.withStatusCode(404)
}
}
}
object APIStationQuery extends App {
val asq = new APIStationQuery()
val request = new APIGatewayProxyRequestEvent()
.withQueryStringParamters(Map("q" -> "Darmstadt").asJava)
val resp = asq.handleRequest(request, null)
println(s"$resp")
asq.system.terminate()
}
\ No newline at end of file
......@@ -16,6 +16,8 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
import de.codecentric.amuttsch.bahndelayinfo.utils.TraversableImplicits._
class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
......@@ -57,31 +59,27 @@ class SNSChangeSlackReporter {
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.foreach { tti =>
slackRecipients foreach { sr =>
if (tti.station.matches(sr.stationRegex) &&
tti.train.matches(sr.trainRegex)
) {
if (!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
) {
val resp = slackApiClient.postChatMessage(
sr.channel,
tti.toString
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
sr.seenTimetableInformation += (tti.id -> tti)
sr.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(sr))
.withPrimaryKey("id", sr.user)
tableSlackUsers.putItem(item)
}
}
}
.cross(slackRecipients)
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.eva == tti.eva && tti.train.matches(d.trainRegex)) }
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
.foreach { case (tti, sr) =>
val resp = slackApiClient.postChatMessage(
sr.channel,
tti.toString
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
sr.seenTimetableInformation += (tti.id -> tti)
sr.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(sr))
.withPrimaryKey("id", sr.user)
tableSlackUsers.putItem(item)
}
system.terminate()
......
package de.codecentric.amuttsch.bahndelayinfo
import com.amazonaws.services.dynamodbv2.document.{Item, ItemCollection, ScanOutcome}
import com.sksamuel.elastic4s.aws.Aws4ElasticClient
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
import scala.collection.mutable
......@@ -22,4 +24,11 @@ package object aws {
res
}
def elasticSearchClient(): ElasticClient = {
val esEndpointUrl = "https://"+scala.util.Properties.envOrElse("ELASTICSEARCH_URL", "").split("/")(0)+":80"
val esProperties = ElasticProperties(esEndpointUrl).endpoints.head
val esEndpoint = s"${esProperties.protocol}://${esProperties.host}:${esProperties.port}"
Aws4ElasticClient(esEndpoint)
}
}
package de.codecentric.amuttsch.bahndelayinfo.fetcher
import com.softwaremill.sttp.{HttpURLConnectionBackend, Id, SttpBackend, sttp, _}
import com.softwaremill.sttp._
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import org.xml.sax.SAXParseException
......
......@@ -4,10 +4,14 @@ import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import scala.collection.mutable
case class DelayRegistration(
trainRegex: String,
eva: String
)
case class SlackUser(
user: String,
channel: String,
trainRegex: String,
stationRegex: String,
delayRegistration: Set[DelayRegistration] = Set.empty,
seenTimetableInformation: mutable.Map[String, TimetableInformation] = mutable.Map.empty[String, TimetableInformation]
)
package de.codecentric.amuttsch.bahndelayinfo.utils
object StringDistances {
implicit class StringDistanceImplicits(s: String) {
def damerauLevenshtein(s2: String): Int = StringDistances.damerauLevenshtein(s, s2)
}
def damerauLevenshtein(a: String, b: String): Int = {
val (aLength, bLength) = (a.length, b.length)
(aLength, bLength) match {
case (0, _) => bLength
case (_, 0) => aLength
case (_, _) => {
val dist = Array.ofDim[Int](aLength + 1, bLength + 1)
(0 to aLength).foreach { i => dist(i)(0) = i }
(0 to bLength).foreach { j => dist(0)(j) = j }
(1 to aLength).foreach {
i => {
(1 to bLength).foreach {
j => {
val cost = if (a(i - 1) == b(j - 1)) 0 else 1
dist(i)(j) = Math.min(Math.min(dist(i-1)(j) + 1, dist(i)(j-1) + 1), dist(i -1)(j - 1) + cost)
if (i > 1 && j > 1 && a(i -1) == b(j - 2) && a(i - 2) == b(j - 1)) {
dist(i)(j) = Math.min(dist(i)(j), dist(i - 2)(j - 2) + cost)
}
}
}
}
}
dist(aLength)(bLength)
}
}
}
}
package de.codecentric.amuttsch.bahndelayinfo.utils
object TraversableImplicits {
implicit class Crossable[X](xs: Traversable[X]) {
def cross[Y](ys: Traversable[Y]): Traversable[(X, Y)] = for {x <- xs; y <- ys } yield (x, y)
}
}
Markdown is supported