Jak zapnout Mono do skutečně asynchronní (ne reaktivní!) volání metody?

0

Otázka

Mám metodu

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}

V normální tok programu, jsem volat tuto metodu asynchronně prostřednictvím Kafka události.

Pro účely testování jsem třeba odhalit způsob, jak webové služby, ale tato metoda by měla být vystavena jako asynchronní: vrací pouze HTTP kód 200 OK ("žádost přijata") a dalšího zpracování dat na pozadí.

Je to OK (= nebude to mít žádné nežádoucí vedlejší účinky) jen volat Mono#subscribe() a vrátit se z regulátoru metodou?

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

Nebo je to lepší udělat to takhle (tady jsem zmaten varování od IntelliJ, možná stejné jako https://youtrack.jetbrains.com/issue/IDEA-276018 ?):

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
    return Mono.empty();
}

Nebo nějaké jiné řešení?

2

Nejlepší odpověď

3

Je to OK (= nebude to mít žádné nežádoucí vedlejší účinky) jen volat Mono#přihlásit() a vrátit se z regulátoru metodou?

Tam jsou vedlejší účinky, ale může být ok žít s nimi:

  • To je opravdu oheň a zapomenout - což znamená, že zatímco budete nikdy být informováni o úspěch (což většina lidí neuvědomuje), budete také nikdy být informován o selhání (což mnohem méně lidí si uvědomuje.)
  • Pokud proces visí z nějakého důvodu, že vydavatel nebude nikdy kompletní, a budete mít žádný způsob, jak zjistit. Od vás objednají na ohraničené elastické threadpool, to bude také svázat jeden z těch omezené závity na dobu neurčitou.

První bod by mohl být v pohodě, nebo možná budete chtít dát nějaké protokolování chyb, dále se stanoví, že reaktivní řetězce jako vedlejší účinek nějak tak máte alespoň interní oznámení, pokud se něco pokazí.

Pro druhý bod - já bych doporučil dávat (velkorysý) časový limit na volání metody, tak to alespoň zruší, pokud ještě nebyla dokončena v nastaveném čase, a je již visí kolem spotřebovává zdroje. Pokud používáte asynchronní úkol, pak to není masivní záležitost, jak to bude jen konzumovat trochu paměti. Pokud jste balení blokování volání na elastické plánovač pak je to horší, nicméně, jak jste to vázání závit v tom, že threadpool na dobu neurčitou.

Já bych také otázku, proč je třeba použít elastické, ohraničené plánovač vůbec tady - používá se pro balení blokování hovorů, což se nezdá být základem pro tento případ použití. (Aby bylo jasno, pokud vaše služba je blokování, pak byste měli absolutně zabalte ji na pružné plánovač - ale pokud ne, pak není důvod, aby tak učinily.)

Konečně, tento příklad:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}

...je skvělý příklad toho, co není dělat, když vytváříte jakési "podvodník reaktivní metoda" - někdo může velmi rozumně se přihlásit k odběru, že se vrátil vydavatel myslí, že bude kompletní, když základní vydavatele dokončí, což samozřejmě není to, co se tady děje. Pomocí void návratový typ a tak nevrací nic, je správná věc dělat v této situaci.

2021-11-23 16:54:58
1

Vaše volba s následujícím kódem je vlastně ok:

@GetMapping
public void processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

To je vlastně to, co děláte v @Scheduled metoda, která jednoduše vrátí nic a explicitně se přihlásit k odběru Mono nebo Flux takže které prvky jsou emitovány.

2021-11-23 08:36:44

V jiných jazycích

Tato stránka je v jiných jazycích

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................