Commit e52a8e9c authored by Andreas Muttscheller's avatar Andreas Muttscheller

Improve API and commands

parent 934962ef
package de.codecentric.amuttsch.bahndelayinfo.aws.lambda
import java.time.LocalTime
import akka.actor.ActorSystem
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item, Table}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
......@@ -10,12 +12,11 @@ import com.typesafe.scalalogging.Logger
import de.codecentric.amuttsch.bahndelayinfo.aws
import de.codecentric.amuttsch.bahndelayinfo.aws.sns.SNSNewDelayInformation
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
import de.codecentric.amuttsch.bahndelayinfo.slackmsg.SlackUser
import de.codecentric.amuttsch.bahndelayinfo.slackbot.SlackUser
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import slack.api.BlockingSlackApiClient
import de.codecentric.amuttsch.bahndelayinfo.utils.TraversableImplicits._
class SNSChangeSlackReporter {
......@@ -61,6 +62,9 @@ class SNSChangeSlackReporter {
.filter(_.delayInMinutes > 2)
.cross(slackRecipients)
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.eva == tti.eva && tti.train.matches(d.trainRegex)) }
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.direction.isEmpty || tti.pathTo.contains(d.direction.get)) }
.filter { case (tti, sr) => sr.delayRegistration.exists(d => d.from.isDefined && d.until.isDefined &&
d.from.get.isBefore(LocalTime.from(tti.plannedTime)) && d.until.get.isAfter(LocalTime.from(tti.plannedTime))) }
.filter { case (tti, sr) =>
!sr.seenTimetableInformation.contains(tti.id) ||
// Only send messages when time difference is greater than 2 minutes
......
package de.codecentric.amuttsch.bahndelayinfo.fetcher
import java.text.SimpleDateFormat
import java.time.{LocalDateTime, LocalTime, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import java.util.{Calendar, Date, TimeZone}
import de.codecentric.amuttsch.bahndelayinfo.utils.JsonSerializers
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
......@@ -17,18 +20,18 @@ case class TimetableInformation(
train: String,
from: String,
to: String,
plannedTime: Date,
currentTime: Date,
plannedTime: LocalTime,
currentTime: LocalTime,
pathFrom: List[String],
pathTo: List[String],
finalStop: Boolean = false,
cancelled: Boolean = false
) {
def isHistory: Boolean = {
currentTime.before(Calendar.getInstance().getTime)
currentTime.isBefore(LocalTime.now(ZoneId.of("Europe/Berlin")))
}
def delayInMinutes: Long = TimeUnit.MILLISECONDS.toMinutes(currentTime.getTime - plannedTime.getTime)
def delayInMinutes: Long = TimeUnit.SECONDS.toMinutes(currentTime.toSecondOfDay - plannedTime.toSecondOfDay)
def equalsWithoutCurrentTime(o: Any): Boolean = {
o match {
......@@ -48,10 +51,10 @@ case class TimetableInformation(
override def toString: String = toString(printDelay = true)
def toString(printDelay: Boolean = true): String = {
val prettyPlannedTime = TimetableInformation.printTimeFormat.format(plannedTime)
val prettyCurrentTime = TimetableInformation.printTimeFormat.format(currentTime)
val prettyPlannedTime = plannedTime.format(TimetableInformation.printTimeFormat)
val prettyCurrentTime = currentTime.format(TimetableInformation.printTimeFormat)
val delay = TimeUnit.MILLISECONDS.toMinutes(currentTime.getTime - plannedTime.getTime)
val delay = TimeUnit.SECONDS.toMinutes(currentTime.toSecondOfDay - plannedTime.toSecondOfDay)
val direction = if (finalStop) "arrival " else "departure"
val plannedTimeString = f"$station: $train%-20s from $from%-20s to $to%-20s: planned $direction $prettyPlannedTime"
......@@ -83,17 +86,14 @@ case class TimetableInformation(
}
object TimetableInformation {
private val jsonDateTimeFormat = new SimpleDateFormat("yyMMddHHmm")
private val printTimeFormat = new SimpleDateFormat("HH:mm")
jsonDateTimeFormat.setTimeZone(TimeZone.getTimeZone("CET"))
printTimeFormat.setTimeZone(TimeZone.getTimeZone("CET"))
private val apiDateTimeFormat = DateTimeFormatter.ofPattern("yyMMddHHmm")
private val printTimeFormat = DateTimeFormatter.ofPattern("HH:mm")
private val customSerializer = new CustomSerializer[String](_ => (
{ case JString(s) => s },
{ case "" => JNothing case s: String => JString(s) }
))
private implicit val jsonFormat: Formats = DefaultFormats + customSerializer
private implicit val jsonFormat: Formats = DefaultFormats + customSerializer + JsonSerializers.LocalTimeSerializer
def apply(
id: String,
......@@ -102,8 +102,8 @@ object TimetableInformation {
train: String,
from: String,
to: String,
plannedTime: Date,
currentTime: Date,
plannedTime: LocalTime,
currentTime: LocalTime,
pathFrom: List[String],
pathTo: List[String],
finalStop: Boolean = false,
......@@ -140,17 +140,17 @@ object TimetableInformation {
val to = if (dpPpth.head.isEmpty) station else dpPpth.reverse(0)
val cancelled = (s \ "ar" \@ "cs") == "c" || (s \ "dp" \@ "cs") == "c"
val plannedTime: Date = if ((s \ "dp" \@ "pt").nonEmpty) {
jsonDateTimeFormat.parse(s \ "dp" \@ "pt")
val plannedTime: LocalDateTime = if ((s \ "dp" \@ "pt").nonEmpty) {
LocalDateTime.parse(s \ "dp" \@ "pt", apiDateTimeFormat)
} else if ((s \ "ar" \@ "pt").nonEmpty) {
jsonDateTimeFormat.parse(s \ "ar" \@ "pt")
LocalDateTime.parse(s \ "ar" \@ "pt", apiDateTimeFormat)
} else {
new Date()
LocalDateTime.now()
}
val currentTime: Date = if ((s \ "dp" \@ "ct").nonEmpty) {
jsonDateTimeFormat.parse(s \ "dp" \@ "ct")
val currentTime: LocalDateTime = if ((s \ "dp" \@ "ct").nonEmpty) {
LocalDateTime.parse(s \ "dp" \@ "ct", apiDateTimeFormat)
} else if ((s \ "ar" \@ "ct").nonEmpty) {
jsonDateTimeFormat.parse(s \ "ar" \@ "ct")
LocalDateTime.parse(s \ "ar" \@ "ct", apiDateTimeFormat)
} else {
plannedTime
}
......@@ -162,8 +162,8 @@ object TimetableInformation {
prettyTrainName,
from,
to,
plannedTime,
currentTime,
plannedTime.toLocalTime,
currentTime.toLocalTime,
arPpth.toList,
dpPpth.toList,
(s \ "dp").isEmpty,
......
//package de.codecentric.amuttsch.bahndelayinfo.reporter
//
//import java.time.Duration
//import java.util.{Collections, Properties}
//
//import com.amazonaws.services.sns.AmazonSNSClientBuilder
//import com.amazonaws.services.sns.model.SubscribeRequest
//import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
//import org.apache.kafka.common.serialization.StringDeserializer
//import org.json.JSONObject
//
//import scala.concurrent.Future
//import scala.concurrent.ExecutionContext.Implicits.global
//
//trait ChangeReporter {
// private var stopFuture = false
// val id: String
//
// def reportChange(id: String, record: JSONObject)
//
// def consumeChanges(topic: String): Future[Int] = {
// stopFuture = false
// Future[Int] {
// val props = {
// val p = new Properties
// p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, scala.util.Properties.envOrElse("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092"))
// p.put(ConsumerConfig.GROUP_ID_CONFIG, s"changed-timetable-reporter-${id}")
// p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
// p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
// p
// }
//
// val consumer = new KafkaConsumer[String, String](props)
// consumer.subscribe(Collections.singleton(topic))
//
// var consumedRecords = 0
// while (!stopFuture) {
// val records = consumer.poll(Duration.ofSeconds(3))
//
// records forEach { record =>
// val jsonRecord = new JSONObject(record.value())
//
// reportChange(record.key(), jsonRecord)
// }
//
// consumedRecords += records.count()
// }
//
// consumer.close()
//
// consumedRecords
// }
// }
//
// def stopConsume(): Unit = stopFuture = true
//}
//package de.codecentric.amuttsch.bahndelayinfo.reporter
//
//import java.text.SimpleDateFormat
//import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
//import java.util.{Calendar, Date}
//
//import de.codecentric.amuttsch.bahndelayinfo.KafkaTopics
//import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
//import org.json.JSONObject
//
//import scala.concurrent.Await
//import scala.concurrent.duration._
//import scala.sys.process._
//
//class ConsoleChangeReporter extends ChangeReporter {
// val jsonDateTimeFormat = new SimpleDateFormat("yyMMddHHmm")
// val printTimeFormat = new SimpleDateFormat("HH:mm")
//
// var timetableInformations: Seq[TimetableInformation] = Seq[TimetableInformation]()
//
// override val id: String = "SlackChangeReporter"
//
// override def reportChange(id: String, s: JSONObject): Unit = {
// try {
// val tti = TimetableInformation.fromJSON(s)
//
// // Remove all seen changes from the past and all past information about this specific stop
// timetableInformations = timetableInformations.filter { ttiFilter =>
// !ttiFilter.equalsWithoutCurrentTime(tti)
// }.filter(_.delayInMinutes > 0) :+ tti
// } catch {
// case _: IllegalArgumentException =>
// }
// }
//
// def printTimetable(station: String): Unit = {
// val ttiCurrentAtStation = timetableInformations
// .filter(_.currentTime.after(Calendar.getInstance().getTime)) // Show only future stops
// .filter(_.station == station) // from the given station
//
// val ttiSortedWithDelay = ttiCurrentAtStation
// .filter(_.delayInMinutes > 0) // with a delay
// .filter(!_.cancelled) // which are not cancelled
// .sortBy(_.plannedTime) // and sort by planned time
//
// println(f"*** Timetable at $station - ${Calendar.getInstance().getTime} ***")
// ttiSortedWithDelay foreach println
//
// def printAverageDelay(trains: Seq[TimetableInformation], trainType: String): Unit = {
// if (trains.nonEmpty) {
// val avgDelay = trains.map(_.delayInMinutes).sum / trains.size
// println(f"Average delay: $avgDelay minutes for ${trains.size} trains ($trainType)")
// }
// }
//
// val nationalTrains = ttiSortedWithDelay.filter(t => t.train.startsWith("IC") || t.train.startsWith("EC"))
// val regionalTrains = ttiSortedWithDelay.filter(t => t.train.startsWith("RE") || t.train.startsWith("RB") || t.train.startsWith("VIA"))
// val sBahnTrains = ttiSortedWithDelay.filter(t => t.train.startsWith("S "))
//
// println()
// printAverageDelay(ttiSortedWithDelay, "all")
// printAverageDelay(nationalTrains, "National Trains")
// printAverageDelay(regionalTrains, "Regional Trains")
// printAverageDelay(sBahnTrains, "S-Bahn")
// println()
//
// println(f"Cancelled trips: ${ttiCurrentAtStation.count(_.cancelled)}")
// ttiCurrentAtStation.filter(_.cancelled) foreach println
// }
//}
//
//object ConsoleChangeReporter extends App {
// val ccr = new ConsoleChangeReporter()
// val f = ccr.consumeChanges(KafkaTopics.TOPIC_TIMETABLE_CHANGES_JSON)
//
// val ex = new ScheduledThreadPoolExecutor(1)
// val task = new Runnable {
// def run(): Unit = {
// try {
// "clear".! // clear screen
// ccr.printTimetable("Frankfurt(Main)Hbf")
// println()
// ccr.printTimetable("Darmstadt Süd")
// println()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// val timerFuture = ex.scheduleAtFixedRate(task, 1, 10, TimeUnit.SECONDS)
//
// val result = Await.ready(f, Duration.Inf).value.get
//
// result match {
// case scala.util.Success(_) => println("finished successfully")
// case scala.util.Failure(exception) => throw exception
// }
//
// ex.awaitTermination(10, TimeUnit.MINUTES)
//}
package de.codecentric.amuttsch.bahndelayinfo.slackmsg
package de.codecentric.amuttsch.bahndelayinfo.slackbot
sealed trait SlackMessage
case class IdAndName(
id: String,
name: String
)
case class UrlVerification(
`type`: String,
token: String,
......@@ -14,4 +19,12 @@ case class Message(
user: String,
text: String,
ts: String
) extends SlackMessage
case class InteractiveMessage(
`type`: String,
callback_id: String,
user: IdAndName,
channel: IdAndName,
actions: List[Map[String, String]]
) extends SlackMessage
\ No newline at end of file
package de.codecentric.amuttsch.bahndelayinfo.slackmsg
package de.codecentric.amuttsch.bahndelayinfo.slackbot
import java.time.LocalTime
import de.codecentric.amuttsch.bahndelayinfo.fetcher.TimetableInformation
......@@ -6,10 +8,11 @@ import scala.collection.mutable
case class DelayRegistration(
trainRegex: String,
eva: String
) {
override def toString: String = s""
}
eva: String,
from: Option[LocalTime],
until: Option[LocalTime],
direction: Option[String]
)
case class SlackUser(
user: String,
......
package de.codecentric.amuttsch.bahndelayinfo.utils
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import org.json4s.{CustomSerializer, JNothing, JString}
object JsonSerializers {
val LocalTimeSerializer = new CustomSerializer[LocalTime](_ => (
{
case JString(s) => LocalTime.parse(s)
},
{
case t: LocalTime => JString(t.format(DateTimeFormatter.ISO_LOCAL_TIME))
}
))
val NoEmptyStringSerializer = new CustomSerializer[String](_ => (
{ case JString(s) => s },
{ case "" => JNothing case s: String => JString(s) }
))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment