Snažím se napsat kus kódu, který dělá následující:-
- Čte velké csv souboru ze vzdáleného zdroje, jako je s3.
- Proces souboru podle záznamu.
- Odeslat oznámení uživateli
- Zapíše výstup do vzdáleného umístění
Ukázka záznamu ve vstupní csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Můj vstup případě třída, která představuje záznam v vstupní csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Vzorek rekord ve výstupu csv (která musí být písemná):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Můj výstup případě třída, která představuje záznam v vstupní csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Čtení záznamu pomocí akka proud csv (používá Alpakka reaktivní s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Teď mám funkci pro zpracování záznamů:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Funkce psát OutputRecord jako csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Funkce odesílání oznámení e-mailem:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Šití to všechno dohromady
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Na Řádku 15 a 16 jsem dostat chybu, jsem schopen přidat Řádek 15 nebo Linka 16, ale ne obě, protože obě notify
a writeOutput
potřebuje outputRecord
. Jakmile oznámí se nazývá přijdu outputRecord
.
Je tam způsob, jak mohu přidat i notify
a writeOutput
do stejného grafu?
Nehledám paralelní provádění, jak chci, aby první hovor notify
a pak jen writeOutput
. Takže to není užitečné: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Použití případ se zdá velmi jednoduché, ale některé, jak jsem nebyl schopen najít čisté řešení.