Akka flow Input (`V`) jako Výstup ("Out")

0

Otázka

Snažím se napsat kus kódu, který dělá následující:-

  1. Čte velké csv souboru ze vzdáleného zdroje, jako je s3.
  2. Proces souboru podle záznamu.
  3. Odeslat oznámení uživateli
  4. Zapíše výstup do vzdáleného umístění

Ukázka záznamu ve vstupní csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Můj vstup případě třída, která představuje záznam v vstupní csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Vzorek rekord ve výstupu csv (která musí být písemná):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Můj výstup případě třída, která představuje záznam v vstupní csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Čtení záznamu pomocí akka proud csv (používá Alpakka reaktivní s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Teď mám funkci pro zpracování záznamů:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Funkce psát OutputRecord jako csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Funkce odesílání oznámení e-mailem:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Šití to všechno dohromady

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Na Řádku 15 a 16 jsem dostat chybu, jsem schopen přidat Řádek 15 nebo Linka 16, ale ne obě, protože obě notify a writeOutput potřebuje outputRecord. Jakmile oznámí se nazývá přijdu outputRecord.

Je tam způsob, jak mohu přidat i notify a writeOutput do stejného grafu?

Nehledám paralelní provádění, jak chci, aby první hovor notify a pak jen writeOutput. Takže to není užitečné: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Použití případ se zdá velmi jednoduché, ale některé, jak jsem nebyl schopen najít čisté řešení.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Nejlepší odpověď

1

Výstup notify je PushResult, ale vstup writeOutput je ByteString. Jakmile změníte, že to bude sestavovat. V případě, že potřebujete ByteString, dostat stejné z OutputRecord.

BTW, v ukázkový kód, který jste za předpokladu, podobná chyba existuje v readCSV a process.

2021-11-24 03:36:16

V jiných jazycích

Tato stránka je v jiných jazycích

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................