V jeho docstring, elasticsearch.helpers.async_bulk
sám sebe popisuje jako
Pomocník pro :meth:
~elasticsearch.AsyncElasticsearch.bulk
api, které poskytuje více lidské přátelské rozhraní - to spotřebuje iterátor akcí a posílá je do elasticsearch na kousky. zdroj
Souvislosti
Byl jsem pomocí AsyncElasticsearch.bulk()
úspěšně posílat pandy datové části rámců pro některé instance ES
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Problém
Nicméně, když přijde na async_bulk
Jsem stále index is missing
chyby.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Snažil naladit _rec_to_actions()
v několika způsoby, bez valného efektu.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Myslím, že hlavní problém je, že já nejsem zcela jistý, vědět , co je akce, v souvislosti s elasticsearch. Tento pojem je všude v dokumentaci, ale nemá jasnou strukturu dat protějšek v této knihovně zdrojový kód (nikdo, že jsem mohl najít, tak jako tak)
Co je to přesně za akci a jak to mám naladit můj generátor poslat df údaje self.index
?
prostředí
- python = "3.9.5"
- elasticsearch = "7.14.1"