Commit 929a6bf5 authored by Andreas Muttscheller's avatar Andreas Muttscheller

Improve code

parent e52a8e9c
......@@ -16,7 +16,7 @@ libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3",
// Http client
"com.softwaremill.sttp" %% "core" % "1.5.4",
"com.softwaremill.sttp" %% "core" % "1.5.8",
// xml and json
"org.scala-lang.modules" %% "scala-xml" % "1.1.1",
......@@ -26,9 +26,9 @@ libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "2.1.0",
// AWS
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.483",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.483",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.483",
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-sns" % "1.11.488",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.11.488",
"com.amazonaws" % "aws-lambda-java-events" % "2.2.5",
"com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
......
......@@ -15,6 +15,7 @@ provider:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:Query
- dynamodb:DeleteItem
- sns:CreateTopic
- sns:Publish
- es:ESHttpPost
......@@ -61,8 +62,9 @@ functions:
events:
# Monday to Friday - every 2 minutes - from 6-10 and 15-19
# UTC !!!
- schedule: cron(0/2 5-10 ? * MON-FRI *)
- schedule: cron(0/2 14-19 ? * MON-FRI *)
#- schedule: cron(0/2 5-10 ? * MON-FRI *)
#- schedule: cron(0/2 14-19 ? * MON-FRI *)
- schedule: cron(0/2 5-19 ? * MON-FRI *)
SlackChangeReporter:
handler: de.codecentric.amuttsch.bahndelayinfo.aws.lambda.SNSChangeSlackReporter::handleRequest
reservedConcurrency: 1 # Only one instance may run at a time
......
......@@ -22,7 +22,9 @@ import scala.util.{Failure, Success, Try}
class APISlackBotEventHandlerWorker {
implicit val logger: Logger = Logger(classOf[APISlackBotEventHandlerWorker])
implicit val jsonFormats: Formats = DefaultFormats + JsonSerializers.LocalTimeSerializer
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.LocalDateTimeSerializer
implicit val system: ActorSystem = ActorSystem("slack")
val ddbClient: AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard.build
......@@ -72,8 +74,9 @@ class APISlackBotEventHandlerWorker {
case JString("message") =>
val msg = (json \ "event").extract[Message]
if (!(json \ "authed_users").extract[Array[String]].contains(msg.user)) {
logger.info("Ignoring my sent message!")
parseMessage(msg)
} else {
logger.info("Ignoring my sent message!")
}
case _ =>
......@@ -102,7 +105,7 @@ class APISlackBotEventHandlerWorker {
}
def parseInteractiveMessageAction(msg: InteractiveMessage): Unit = {
implicit val slackUser: SlackUser = getOrCreateUser(msg.user.id, msg.channel.id)
implicit var slackUser: SlackUser = getOrCreateUser(msg.user.id, msg.channel.id)
msg.callback_id match {
case "unsubscribe_delay_id" =>
......@@ -118,6 +121,9 @@ class APISlackBotEventHandlerWorker {
.withPrimaryKey("id", slackUser.user)
tableSlackUsers.putItem(item)
// Reload slack user, otherwise the removed delay subscription is still
// visible
slackUser = getOrCreateUser(msg.user.id, msg.channel.id)
ListDelaySubscriptions.execute(List.empty)
}
case _ => logger.warn(s"Unknown callbackId ${msg.callback_id}")
......@@ -129,6 +135,7 @@ class APISlackBotEventHandlerWorker {
case "help" :: Nil =>
help(message)
case cmd :: params =>
logger.info(s"Executing command $cmd with $params")
implicit val slackUser: SlackUser = getOrCreateUser(message)
commands(cmd).execute(params)
case tail =>
......@@ -150,7 +157,7 @@ class APISlackBotEventHandlerWorker {
| Send delay information from Deutsche Bahn via Slack when your train is running late!
|
| Commands:
| ${commands.values.map(c => c.printHelp).mkString("\t", "\n\t", "\n")}
|${commands.values.map(c => c.printHelp).mkString("", "\n\n", "\n")}
| Note: Regional trains have a unique number after the line in brackets, e.g. RB 68 (15311), where 15311 is the
| train number. If you only want to get delay information for specific trains rather than all trains for a
......@@ -174,6 +181,7 @@ class APISlackBotEventHandlerWorker {
def getOrCreateUser(userId: String, channelId: String): SlackUser = {
tableSlackUsers.getItem("id", userId) match {
case item: Item =>
logger.info(s"getOrCreateUser ${item.toJSON}")
parse(item.toJSON()).extract[SlackUser]
case null =>
val newSlackUser = SlackUser(userId, channelId)
......
......@@ -9,19 +9,23 @@ import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.slackbot.SlackUser
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
import de.codecentric.amuttsch.bahndelayinfo.utils.TraversableImplicits._
import scala.util.{Success, Try}
class SNSChangeSlackReporter {
implicit val logger: Logger = Logger(SNSChangeSlackReporter.getClass)
implicit val jsonFormats: DefaultFormats.type = DefaultFormats
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.LocalDateTimeSerializer +
JsonSerializers.LocalTimeSerializer
val apiConf: Config = ConfigFactory.load("api")
val token: String = apiConf.getString("slack.token")
......@@ -51,40 +55,47 @@ class SNSChangeSlackReporter {
slackRecipients = slackRecipients + parse(item.toJSON).extract[SlackUser]
}
logger.info(s"Slack Recipients:\n${slackRecipients.mkString("\n")}")
val stops = tableTimetableStops.scan()
aws.dynamoDBScanToScala(stops)
.map(item => TimetableInformation.fromJson(item.getJSON("tti")))
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.cross(slackRecipients)
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.eva == tti.eva && tti.train.matches(d.trainRegex)) }
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.direction.isEmpty || tti.pathTo.contains(d.direction.get)) }
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
.foreach { case (tti, sr) =>
val resp = slackApiClient.postChatMessage(
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
sr.seenTimetableInformation += (tti.id -> tti)
sr.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(sr))
.withPrimaryKey("id", sr.user)
tableSlackUsers.putItem(item)
}
stops.forEach { o =>
Some(o)
.map(item => Try(TimetableInformation.fromJson(item.getJSON("tti"))))
.collect { case Success(value) => value}
.filter(_.eva == newDelayInformation.eva)
.filter(!_.isHistory)
.filter(_.delayInMinutes > 2)
.cross(slackRecipients)
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
Math.abs(tti.delayInMinutes - sr.seenTimetableInformation(tti.id).delayInMinutes) > 2
}
.flatMap { case (tti, sr) =>
sr.delayRegistration.map(d => (tti, sr, d))
}
.filter { case (tti, _, d) => d.eva == tti.eva && tti.train.matches(d.trainRegex) }
.filter { case (tti, _, d) => d.direction.isEmpty || tti.pathTo.contains(d.direction.get) }
.filter { case (tti, _, d) => d.from.isEmpty || d.until.isEmpty ||
(d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
.foreach { case (tti, sr, _) =>
val resp = slackApiClient.postChatMessage(
sr.channel,
s"`${tti.toString}`"
)
logger.info(s"Sending messages to channel ${sr.channel} for station ${tti.station}: $resp")
val suJson = tableSlackUsers.getItem("id", sr.user).toJSON
val newSlackUser = parse(suJson).extract[SlackUser]
newSlackUser.seenTimetableInformation += (tti.id -> tti)
newSlackUser.seenTimetableInformation.retain((_, v) => !v.isHistory)
val item = Item
.fromJSON(write(newSlackUser))
.withPrimaryKey("id", newSlackUser.user)
tableSlackUsers.putItem(item)
}
}
system.terminate()
}
......
......@@ -13,6 +13,8 @@ import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import scala.util.{Failure, Success, Try}
class ScheduledChangesFetchService {
......@@ -41,15 +43,21 @@ class ScheduledChangesFetchService {
timetableInformations.foreach { tti =>
val item = tableTimetableStops.getItem("id", tti.id)
if (item != null) {
val ttiDb = TimetableInformation.fromJson(item.getJSON("tti"))
val newTti = ttiDb.updateDelay(tti)
// Update entry in dynamodb
val newItem = new Item()
.withPrimaryKey("id", newTti.id)
.withJSON("tti", newTti.toJson)
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
val ttiJson = item.getJSON("tti")
Try(TimetableInformation.fromJson(ttiJson)) match {
case Success(ttiDb) =>
val newTti = ttiDb.updateDelay(tti)
// Update entry in dynamodb
val newItem = new Item()
.withPrimaryKey("id", newTti.id)
.withJSON("tti", newTti.toJson)
.withLong("ttl", System.currentTimeMillis() / 1000 + 24 * 60 * 60)
tableTimetableStops.putItem(newItem)
case Failure(_) =>
logger.error(s"Could not parse JSON: $ttiJson")
tableTimetableStops.deleteItem("id", tti.id)
}
}
}
......
package de.codecentric.amuttsch.bahndelayinfo.fetcher
import java.text.SimpleDateFormat
import java.time.{LocalDateTime, LocalTime, ZoneId}
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneId}
import java.util.concurrent.TimeUnit
import java.util.{Calendar, Date, TimeZone}
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import org.json4s._
......@@ -20,18 +18,20 @@ case class TimetableInformation(
train: String,
from: String,
to: String,
plannedTime: LocalTime,
currentTime: LocalTime,
plannedTime: LocalDateTime,
currentTime: LocalDateTime,
pathFrom: List[String],
pathTo: List[String],
finalStop: Boolean = false,
cancelled: Boolean = false
) {
def isHistory: Boolean = {
currentTime.isBefore(LocalTime.now(ZoneId.of("Europe/Berlin")))
currentTime.isBefore(LocalDateTime.now(ZoneId.of("Europe/Berlin")))
}
def delayInMinutes: Long = TimeUnit.SECONDS.toMinutes(currentTime.toSecondOfDay - plannedTime.toSecondOfDay)
def delayInMinutes: Long = {
TimeUnit.SECONDS.toMinutes(currentTime.toLocalTime.toSecondOfDay - plannedTime.toLocalTime.toSecondOfDay)
}
def equalsWithoutCurrentTime(o: Any): Boolean = {
o match {
......@@ -54,7 +54,7 @@ case class TimetableInformation(
val prettyPlannedTime = plannedTime.format(TimetableInformation.printTimeFormat)
val prettyCurrentTime = currentTime.format(TimetableInformation.printTimeFormat)
val delay = TimeUnit.SECONDS.toMinutes(currentTime.toSecondOfDay - plannedTime.toSecondOfDay)
val delay = delayInMinutes
val direction = if (finalStop) "arrival " else "departure"
val plannedTimeString = f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime"
......@@ -89,11 +89,10 @@ object TimetableInformation {
private val apiDateTimeFormat = DateTimeFormatter.ofPattern("yyMMddHHmm")
private val printTimeFormat = DateTimeFormatter.ofPattern("HH:mm")
private val customSerializer = new CustomSerializer[String](_ => (
{ case JString(s) => s },
{ case "" => JNothing case s: String => JString(s) }
))
private implicit val jsonFormat: Formats = DefaultFormats + customSerializer + JsonSerializers.LocalTimeSerializer
private implicit val jsonFormat: Formats = DefaultFormats +
JsonSerializers.NoEmptyStringSerializer +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.LocalDateTimeSerializer
def apply(
id: String,
......@@ -102,8 +101,8 @@ object TimetableInformation {
train: String,
from: String,
to: String,
plannedTime: LocalTime,
currentTime: LocalTime,
plannedTime: LocalDateTime,
currentTime: LocalDateTime,
pathFrom: List[String],
pathTo: List[String],
finalStop: Boolean = false,
......@@ -111,13 +110,7 @@ object TimetableInformation {
): TimetableInformation = new TimetableInformation(id, station, eva, train, from, to, plannedTime, currentTime, pathFrom, pathTo, finalStop, cancelled)
def fromJson(json: String): TimetableInformation = {
try {
parse(json).extract[TimetableInformation]
} catch {
case e: Exception =>
println(json)
throw e
}
parse(json).extract[TimetableInformation]
}
def fromXML(s: Node, station: String, eva: String): TimetableInformation = {
......@@ -162,8 +155,8 @@ object TimetableInformation {
prettyTrainName,
from,
to,
plannedTime.toLocalTime,
currentTime.toLocalTime,
plannedTime,
currentTime,
arPpth.toList,
dpPpth.toList,
(s \ "dp").isEmpty,
......
......@@ -17,11 +17,10 @@ import com.softwaremill.sttp._
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.Station
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.slackbot.ListDelaySubscriptions.timeFormatter
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import org.json4s.{DefaultFormats, Formats}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import org.json4s.{DefaultFormats, Formats}
import slack.api.BlockingSlackApiClient
import slack.models.{ActionField, Attachment}
......@@ -31,7 +30,10 @@ import scala.util.{Failure, Success, Try}
sealed trait SlackCommand {
implicit val sttpBackend: SttpBackend[Id, Nothing] = HttpURLConnectionBackend()
implicit val jsonFormats: Formats = DefaultFormats + JsonSerializers.LocalTimeSerializer + JsonSerializers.NoEmptyStringSerializer
implicit val jsonFormats: Formats = DefaultFormats +
JsonSerializers.LocalTimeSerializer +
JsonSerializers.NoEmptyStringSerializer +
JsonSerializers.LocalDateTimeSerializer
val timeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm")
......@@ -112,7 +114,7 @@ trait ParameterParser extends SlackCommand {
}
}
object SearchStation extends SlackCommand with StationFinder {
case object SearchStation extends SlackCommand with StationFinder {
override val cmd: String = "searchStation"
override val help: String = "searchStation <station>"
override val description: String =
......@@ -142,7 +144,7 @@ object SearchStation extends SlackCommand with StationFinder {
}
}
object AddStation extends SlackCommand with StationFinder {
case object AddStation extends SlackCommand with StationFinder {
override val cmd: String = "addStation"
override val help: String = "addStation <station>"
override val description: String =
......@@ -184,7 +186,7 @@ object AddStation extends SlackCommand with StationFinder {
}
}
object SubscribeDelay extends SlackCommand with StationFinder with ParameterParser {
case object SubscribeDelay extends SlackCommand with StationFinder with ParameterParser {
override val cmd: String = "subscribeDelay"
override val help: String = "subscribeDelay -t <train> -s <station>"
override val description: String =
......@@ -286,7 +288,7 @@ object SubscribeDelay extends SlackCommand with StationFinder with ParameterPars
}
}
object ListActiveStations extends SlackCommand with StationFinder with ParameterParser {
case object ListActiveStations extends SlackCommand with StationFinder with ParameterParser {
override val cmd: String = "listActiveStations"
override val help: String = "listActiveStations"
override val description: String =
......@@ -315,7 +317,7 @@ object ListActiveStations extends SlackCommand with StationFinder with Parameter
}
}
object ListDelaySubscriptions extends SlackCommand with StationFinder with ParameterParser {
case object ListDelaySubscriptions extends SlackCommand with StationFinder with ParameterParser {
override val cmd: String = "listDelaySubscriptions"
override val help: String = "listDelaySubscriptions"
override val description: String =
......@@ -336,7 +338,7 @@ object ListDelaySubscriptions extends SlackCommand with StationFinder with Param
val direction = d.direction.getOrElse("")
Attachment(
text = Some(s":train2: ${d.trainRegex} :station: $station :clock1: $from - $until :arrow_right: $direction"),
text = Some(s":train2: ${d.trainRegex} \t:station: $station \t:clock1: $from - $until \t:arrow_right: $direction"),
callback_id = Some("unsubscribe_delay_id"),
color = Some("#3AA3E3"),
actions = Some(Seq(
......@@ -360,7 +362,7 @@ object ListDelaySubscriptions extends SlackCommand with StationFinder with Param
}
}
object SearchTrain extends SlackCommand with StationFinder with ParameterParser {
case object SearchTrain extends SlackCommand with StationFinder with ParameterParser {
override val cmd: String = "searchTrain"
override val help: String = "searchTrain <station> <search string>"
override val description: String =
......
package de.codecentric.amuttsch.bahndelayinfo.utils
import java.time.LocalTime
import java.time.{LocalDateTime, LocalTime}
import java.time.format.DateTimeFormatter
import org.json4s.{CustomSerializer, JNothing, JString}
object JsonSerializers {
val LocalDateTimeSerializer = new CustomSerializer[LocalDateTime](_ => (
{
case JString(s) => LocalDateTime.parse(s)
},
{
case t: LocalDateTime => JString(t.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))
}
))
val LocalTimeSerializer = new CustomSerializer[LocalTime](_ => (
{
case JString(s) => LocalTime.parse(s)
......
......@@ -4,4 +4,8 @@ object TraversableImplicits {
implicit class Crossable[X](xs: Traversable[X]) {
def cross[Y](ys: Traversable[Y]): Traversable[(X, Y)] = for {x <- xs; y <- ys } yield (x, y)
}
implicit class CrossableOption[X](xs: Option[X]) {
def cross[Y](ys: Traversable[Y]): Traversable[(X, Y)] = for { y <- ys; if xs.isDefined } yield (xs.get, y)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment