Skip to content

API Reference

En esta sección se detalla la documentación del código fuente de la aplicación, extraída directamente de los docstrings. La documentación está organizada siguiendo el flujo de ejecución del procesamiento de los Bills of Lading (BOLs).

Scheduler

El proceso puede ser iniciado automáticamente por un planificador de tareas (scheduler) que se ejecuta a intervalos regulares.

app.scheduler.jobs

Este módulo se encarga de configurar y ejecutar la tarea programada que llama al endpoint de procesamiento de BOLs.

app.scheduler.jobs

Functions

call_process_bol() async

Realiza una llamada HTTP POST al endpoint de procesamiento de BOLs.

Esta función asíncrona construye la URL y la fecha de inicio para la solicitud, y luego envía una petición POST al servicio process_bol.

Returns:

Name Type Description
None

La función no retorna un valor explícito, pero registra el resultado de la llamada HTTP.

Source code in app/scheduler/jobs.py
async def call_process_bol():
    """
    Realiza una llamada HTTP POST al endpoint de procesamiento de BOLs.

    Esta función asíncrona construye la URL y la fecha de inicio para la solicitud,
    y luego envía una petición POST al servicio `process_bol`.

    Args:
        No recibe argumentos.

    Returns:
        None: La función no retorna un valor explícito, pero registra el resultado de la llamada HTTP.
    """
    url = settings.process_bol_url
    timeout = settings.http_client_timeout_seconds

    start_date = datetime.now(ZoneInfo(settings.timezone)).isoformat()

    async with httpx.AsyncClient(timeout=timeout) as client:
        r = await client.post(url, json={"start_date": start_date})
        logger.info(
            "[Scheduler] Called process_bol with start_date=%s -> %s: %s",
            start_date, r.status_code, r.text
        )

start_scheduler()

Inicia el planificador de tareas (scheduler) para ejecutar call_process_bol a intervalos regulares.

Configura un AsyncIOScheduler que ejecutará la función call_process_bol cada cierto número de minutos, definido en las configuraciones de la aplicación.

Returns:

Name Type Description
AsyncIOScheduler AsyncIOScheduler

Una instancia del planificador de tareas iniciado.

Source code in app/scheduler/jobs.py
def start_scheduler() -> AsyncIOScheduler:
    """
    Inicia el planificador de tareas (scheduler) para ejecutar `call_process_bol` a intervalos regulares.

    Configura un `AsyncIOScheduler` que ejecutará la función `call_process_bol`
    cada cierto número de minutos, definido en las configuraciones de la aplicación.

    Args:
        No recibe argumentos.

    Returns:
        AsyncIOScheduler: Una instancia del planificador de tareas iniciado.
    """
    scheduler = AsyncIOScheduler(timezone=settings.timezone)
    scheduler.add_job(call_process_bol, IntervalTrigger(minutes=settings.internal_trigger_minutes))
    scheduler.start()
    logger.info("Scheduler started")
    return scheduler

API Endpoint

El endpoint principal que orquesta todo el flujo de procesamiento de BOLs.

app.api.process

Este módulo contiene el endpoint /fetch-dataload que recibe la solicitud de procesamiento.

app.api.process

Functions

fetch_dataload(request, body) async

Procesa los Bills of Lading (BOLs) para crear y actualizar cargas (loads).

Este endpoint orquesta un flujo de varios pasos que comienza con la recuperación de archivos adjuntos (BOLs en formato PDF) desde una cuenta de correo electrónico a partir de una fecha determinada. Posteriormente, consulta información adicional de los BOLs en un sistema externo (Primus), procesa los PDFs utilizando un agente multimodal para extraer datos clave, y enriquece la información con datos de un CRM. Finalmente, crea y actualiza los registros de carga (loads) en el CRM.

Parameters:

Name Type Description Default
request Request

El objeto de solicitud de FastAPI, que contiene el estado de la aplicación (clientes de servicios, etc.).

required
body StartDateRequest

El cuerpo de la solicitud, que debe contener la fecha de inicio para filtrar los correos electrónicos.

required

Returns:

Name Type Description
JSONResponse

Una respuesta JSON que contiene los IDs de las cargas (loads) creadas en el CRM, con un código de estado 200 en caso de éxito. Si no se encuentran archivos adjuntos, retorna una lista vacía.

Source code in app/api/process.py
@router.post(
    "/fetch-dataload",
    response_class=JSONResponse,
    response_model=List[FinalLoad],
    status_code=200,
    responses={200: {"description": "Loads fetched successfully"}},
)
async def fetch_dataload(
    request: Request, 
    body: StartDateRequest
):
    """
    Procesa los Bills of Lading (BOLs) para crear y actualizar cargas (loads).

    Este endpoint orquesta un flujo de varios pasos que comienza con la recuperación de 
    archivos adjuntos (BOLs en formato PDF) desde una cuenta de correo electrónico a partir 
    de una fecha determinada. Posteriormente, consulta información adicional de los BOLs 
    en un sistema externo (Primus), procesa los PDFs utilizando un agente multimodal 
    para extraer datos clave, y enriquece la información con datos de un CRM. 
    Finalmente, crea y actualiza los registros de carga (loads) en el CRM.

    Args:
        request (Request): El objeto de solicitud de FastAPI, que contiene el estado de la 
                           aplicación (clientes de servicios, etc.).
        body (StartDateRequest): El cuerpo de la solicitud, que debe contener la fecha 
                                 de inicio para filtrar los correos electrónicos.

    Returns:
        JSONResponse: Una respuesta JSON que contiene los IDs de las cargas (loads) creadas 
                      en el CRM, con un código de estado 200 en caso de éxito. Si no se 
                      encuentran archivos adjuntos, retorna una lista vacía.
    """
    t0 = time.time()

    state: AppState = request.app.state

    # clientes inicializados en startup_event
    mail_client = state.mail_client
    primus_client = state.primus_client
    crm_client = state.crm_client
    megatron_client = state.megatron_client

    start_dt = datetime.fromisoformat(body.start_date)

    # 1. Traer adjuntos desde correo
    attachments = await mail_client.fetch_bol_attachments(start_dt, limit=10)
    if not attachments:
        logger.warning("No attachments found for start_date=%s", start_dt)
        return JSONResponse(
            status_code=200,
            content=jsonable_encoder([]),
        )

    bol_codes, filtered_attach = mail_client.extract_bol_codes(attachments)
    logger.info(f"{len(bol_codes)} BOLs were found: {bol_codes}")

    # 2. Consultar booking data en Primus
    primus_data = await primus_client.get_booking_data(bol_codes)
    primus_data = primus_data.get("data", {}).get("results", [])
    primus_bols = [str(item["BOLNumber"]) for item in primus_data]

    remains = set(bol_codes) - set(primus_bols) # identificar BOLs sin data en Primus
    if remains:
        logger.warning(f"No data found in Primus for BOLs: {remains}")

        for bol_number in remains:
            email_service.send_email(
                dynamic_data={
                    "bol_number": bol_number,
                    "error_message": f"No data found in Primus for BOL {bol_number}"
                }
            )

    # 3. Procesar cada BOL con el agente multimodal
    llm_results = await process_pdfs(
        agent=multimodal_agent,
        prompt=prompts.PROMPT_BOL,
        attachments=filtered_attach,
        response_schema=response_schema.SCHEMA_BOL_EXTRACTION,
    )

    primus_data = map_units(primus_data,llm_results) # mapear unidades de peso y dimension si es necesario

    # 4. Integrar resultados de CRM (Accounts)
    accounts_results = await crm_client.create_accounts_id(primus_data)
    customer_mode_id = await crm_client.get_customer_and_mode_id(primus_data)
    accessorials_results = map_accesorials_to_glt(primus_data, llm_results, MAPPER_ACCESORIALS_DATA)

    # 5. Integrar resultados de CRM (Contacts)
    contacts_results = await crm_client.create_contacts_id(primus_data, accounts_results)

    # 6. Integrar resultados de Megatron (linear feet)
    linear_feet_by_bol = await megatron_client.calculate_linear_feet_per_bol(primus_data, llm_results)

    # 7. Create loads
    loads = build_final_loads(
        primus_data=primus_data,
        llm_results=llm_results,
        accounts_results=accounts_results,
        contacts_results=contacts_results,
        linear_feet_by_bol=linear_feet_by_bol,
        customer_mode_id=customer_mode_id,
        accessorials_results=accessorials_results,
    )

    loads_id = await crm_client.create_loads(loads)

    # 8. Update loads (carrier & customer quotes) -> NOTE Aquí ya se actualiza el carrier_bill_to
    carrier_services = await crm_client.carrier_service(primus_data)
    await crm_client.create_quotes(primus_data, llm_results, loads_id, carrier_services)
    carrier_bill_ids = await crm_client.create_carrier_bill_id(llm_results)

    # 9. Actualizar datos en los loads.
    updates_data = {
        f"{item['BOLNumber']}": {
            "po_number": ' - '.join([
                item.get("thirdParty", {}).get("referenceNumber", ""),
                item.get("consignee", {}).get("referenceNumber", "")
            ]),
            "pro_number": item.get("vendor",{}).get("PRO",""),
            **(
                {"mode_id": "a0k1I00000B6pAYQAZ"}
                if SCACS_MAPPING.get(
                    item.get("vendor",{}).get("SCAC",""),
                    item.get("vendor",{}).get("SCAC","")) == "FDEG"
                else {}
            )
        }
        for item in primus_data
    }

    for bol_code, values in updates_data.items():
        bill_to_third_party = llm_results.get(bol_code,{}).get('parsed',{}).get("load",{}).get("bill_to_third_party")

        values["external_tms_integration"] = True
        values["carrier_id"] = carrier_services.get(bol_code,{}).get("carrier","")

        if bol_code in carrier_bill_ids:
            values["carrier_bill_id"] = carrier_bill_ids.get(bol_code,{}).get("id","")

        if bill_to_third_party:
            if "BASINGEN GROUP" in bill_to_third_party:
                values["document_options_id"] = DOCUMENT_OPTIONS.get("BASINGEN GROUP","")
            else:
                values["document_options_id"] = DOCUMENT_OPTIONS.get("GLT","")


    await crm_client.update_loads(loads_id, updates_data) # UPDATE LOAD

    elapsed = time.time() - t0
    logger.info(f"Processed {len(bol_codes)} BOLs in {elapsed:.2f} seconds")

    return JSONResponse(
        status_code=200,
        content=loads_id,
    )

Servicios

A continuación se detallan los diferentes servicios y clientes que son utilizados por el endpoint principal para llevar a cabo el procesamiento.

Mail Client

Este servicio se conecta a la API de correo para obtener los correos electrónicos y extraer los archivos adjuntos (PDFs de los BOLs).

app.services.mail_client

app.services.mail_client

Classes

MailClient

Cliente para interactuar con la API interna de correo.

Esta clase gestiona la comunicación con un servicio de correo para obtener correos electrónicos y sus archivos adjuntos, aplicando filtros específicos para los Bills of Lading (BOLs). También implementa un mecanismo para evitar el procesamiento duplicado de correos ya vistos.

Source code in app/services/mail_client.py
class MailClient:
    """
    Cliente para interactuar con la API interna de correo.

    Esta clase gestiona la comunicación con un servicio de correo para obtener 
    correos electrónicos y sus archivos adjuntos, aplicando filtros específicos 
    para los Bills of Lading (BOLs). También implementa un mecanismo para evitar 
    el procesamiento duplicado de correos ya vistos.
    """
    def __init__(self):
        """
        Inicializa el cliente de correo.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        self.base_url = settings.mail_api_url 
        self.subject_filter = settings.mail_filter_subject
        self.from_filter = settings.mail_filter_from
        self.local_tz = ZoneInfo(settings.timezone)
        self.lookback_minutes = settings.lookback_minutes
        self.timeout = settings.http_client_timeout_seconds

        # Guardamos {graph_id: timestamp_visto}
        self._seen_ids: dict[str, datetime] = {}
        self.ttl_hours = 24  # caducidad de 24h

    def _cleanup_seen_ids(self):
        """
        Limpia los IDs de correos que han superado su tiempo de vida (TTL).

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        cutoff = datetime.now(timezone.utc) - timedelta(hours=self.ttl_hours)
        expired = [gid for gid, ts in self._seen_ids.items() if ts < cutoff]
        for gid in expired:
            del self._seen_ids[gid]

    def _mark_seen(self, graph_id: str):
        """
        Registra un ID de correo como "visto".

        Args:
            graph_id (str): El ID único del correo a registrar.

        Returns:
            No retorna ningún valor.
        """
        self._seen_ids[graph_id] = datetime.now(timezone.utc)

    def _is_seen(self, graph_id: str) -> bool:
        """
        Verifica si un ID de correo ya ha sido procesado recientemente.

        Args:
            graph_id (str): El ID único del correo a verificar.

        Returns:
            bool: True si el ID ya ha sido visto, False en caso contrario.
        """
        return graph_id in self._seen_ids

    def extract_bol_codes(self, attachments: list[dict]):
        """
        Extrae los códigos de BOL de una lista de archivos adjuntos.

        Utiliza una expresión regular para encontrar archivos adjuntos que coincidan 
        con el patrón "BOL_XXXX.pdf" y extrae el código numérico.

        Args:
            attachments (list[dict]): Una lista de diccionarios de archivos adjuntos.

        Returns:
            tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos 
                                          de BOL extraídos y una lista de los archivos 
                                          adjuntos que coincidieron con el patrón.
        """
        pattern = re.compile(r"BOL[_-](\d+)\.pdf$", re.IGNORECASE)

        codes = []
        filtered_attachments = []

        for att in attachments:
            name = att.get("name", "")
            match = pattern.search(name)
            if match:
                codes.append(match.group(1))
                filtered_attachments.append(att)

        return codes, filtered_attachments

    async def fetch_bol_attachments(self, start_date: datetime, limit: int = 10) -> list[dict]:
        """
        Recupera adjuntos de correos que contienen Bills of Lading (BOL).

        Consulta el endpoint `/filtered-messages` de la API de correo para obtener 
        mensajes que coincidan con los filtros configurados (asunto y remitente). 
        Luego, para cada correo, obtiene sus archivos adjuntos.

        Args:
            start_date (datetime): La fecha y hora de inicio para filtrar los correos.
            limit (int, optional): El número máximo de correos a procesar. 
                                   Defaults to 10.

        Returns:
            list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario 
                        contiene el nombre ('name') y el contenido ('content') del adjunto.
        """
        # limpiar viejos ids antes de cada consulta
        self._cleanup_seen_ids()

        # asegurar que start_date tiene zona horaria local
        start_date = start_date.replace(tzinfo=self.local_tz)

        # convertir a UTC después de ajustar el lookback
        utc_start_date = (start_date - timedelta(minutes=self.lookback_minutes)).astimezone(timezone.utc)
        logger.info("[Graph] Called GRAPH start_date=%s", utc_start_date.isoformat())

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resp = await client.get(
                f"{self.base_url}/api/graph/filtered-messages",
                params={
                    "start_date": utc_start_date.isoformat(),  # Graph espera UTC
                    "limit": limit,
                    "subject_filter": self.subject_filter,
                    "from_filter": self.from_filter,
                },
            )
            resp.raise_for_status()
            mails = resp.json()

            att_data = []
            for mail in mails:
                graph_id = mail.get("graph_id")
                if not graph_id:
                    continue

                # ignorar si ya fue procesado
                if self._is_seen(graph_id):
                    continue

                att_resp = await client.get(f"{self.base_url}/api/graph/v2/attachments/{graph_id}")
                att_resp.raise_for_status()
                attachments = att_resp.json()

                if attachments:
                    att_data.extend(attachments)
                    self._mark_seen(graph_id)

        return att_data
Functions
__init__()

Inicializa el cliente de correo.

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/mail_client.py
def __init__(self):
    """
    Inicializa el cliente de correo.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor.
    """
    self.base_url = settings.mail_api_url 
    self.subject_filter = settings.mail_filter_subject
    self.from_filter = settings.mail_filter_from
    self.local_tz = ZoneInfo(settings.timezone)
    self.lookback_minutes = settings.lookback_minutes
    self.timeout = settings.http_client_timeout_seconds

    # Guardamos {graph_id: timestamp_visto}
    self._seen_ids: dict[str, datetime] = {}
    self.ttl_hours = 24  # caducidad de 24h
extract_bol_codes(attachments)

Extrae los códigos de BOL de una lista de archivos adjuntos.

Utiliza una expresión regular para encontrar archivos adjuntos que coincidan con el patrón "BOL_XXXX.pdf" y extrae el código numérico.

Parameters:

Name Type Description Default
attachments list[dict]

Una lista de diccionarios de archivos adjuntos.

required

Returns:

Type Description

tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos de BOL extraídos y una lista de los archivos adjuntos que coincidieron con el patrón.

Source code in app/services/mail_client.py
def extract_bol_codes(self, attachments: list[dict]):
    """
    Extrae los códigos de BOL de una lista de archivos adjuntos.

    Utiliza una expresión regular para encontrar archivos adjuntos que coincidan 
    con el patrón "BOL_XXXX.pdf" y extrae el código numérico.

    Args:
        attachments (list[dict]): Una lista de diccionarios de archivos adjuntos.

    Returns:
        tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos 
                                      de BOL extraídos y una lista de los archivos 
                                      adjuntos que coincidieron con el patrón.
    """
    pattern = re.compile(r"BOL[_-](\d+)\.pdf$", re.IGNORECASE)

    codes = []
    filtered_attachments = []

    for att in attachments:
        name = att.get("name", "")
        match = pattern.search(name)
        if match:
            codes.append(match.group(1))
            filtered_attachments.append(att)

    return codes, filtered_attachments
fetch_bol_attachments(start_date, limit=10) async

Recupera adjuntos de correos que contienen Bills of Lading (BOL).

Consulta el endpoint /filtered-messages de la API de correo para obtener mensajes que coincidan con los filtros configurados (asunto y remitente). Luego, para cada correo, obtiene sus archivos adjuntos.

Parameters:

Name Type Description Default
start_date datetime

La fecha y hora de inicio para filtrar los correos.

required
limit int

El número máximo de correos a procesar. Defaults to 10.

10

Returns:

Type Description
list[dict]

list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario contiene el nombre ('name') y el contenido ('content') del adjunto.

Source code in app/services/mail_client.py
async def fetch_bol_attachments(self, start_date: datetime, limit: int = 10) -> list[dict]:
    """
    Recupera adjuntos de correos que contienen Bills of Lading (BOL).

    Consulta el endpoint `/filtered-messages` de la API de correo para obtener 
    mensajes que coincidan con los filtros configurados (asunto y remitente). 
    Luego, para cada correo, obtiene sus archivos adjuntos.

    Args:
        start_date (datetime): La fecha y hora de inicio para filtrar los correos.
        limit (int, optional): El número máximo de correos a procesar. 
                               Defaults to 10.

    Returns:
        list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario 
                    contiene el nombre ('name') y el contenido ('content') del adjunto.
    """
    # limpiar viejos ids antes de cada consulta
    self._cleanup_seen_ids()

    # asegurar que start_date tiene zona horaria local
    start_date = start_date.replace(tzinfo=self.local_tz)

    # convertir a UTC después de ajustar el lookback
    utc_start_date = (start_date - timedelta(minutes=self.lookback_minutes)).astimezone(timezone.utc)
    logger.info("[Graph] Called GRAPH start_date=%s", utc_start_date.isoformat())

    async with httpx.AsyncClient(timeout=self.timeout) as client:
        resp = await client.get(
            f"{self.base_url}/api/graph/filtered-messages",
            params={
                "start_date": utc_start_date.isoformat(),  # Graph espera UTC
                "limit": limit,
                "subject_filter": self.subject_filter,
                "from_filter": self.from_filter,
            },
        )
        resp.raise_for_status()
        mails = resp.json()

        att_data = []
        for mail in mails:
            graph_id = mail.get("graph_id")
            if not graph_id:
                continue

            # ignorar si ya fue procesado
            if self._is_seen(graph_id):
                continue

            att_resp = await client.get(f"{self.base_url}/api/graph/v2/attachments/{graph_id}")
            att_resp.raise_for_status()
            attachments = att_resp.json()

            if attachments:
                att_data.extend(attachments)
                self._mark_seen(graph_id)

    return att_data

Primus Client

Cliente utilizado para conectarse a la API de Primus y obtener la información de las reservas (bookings) asociada a los BOLs.

app.services.primus_client

app.services.primus_client

Classes

PrimusClient

Cliente para interactuar con la API de Primus.

Esta clase maneja la autenticación y las solicitudes a la API de Primus para obtener datos de reservas (bookings).

Source code in app/services/primus_client.py
class PrimusClient:
    """
    Cliente para interactuar con la API de Primus.

    Esta clase maneja la autenticación y las solicitudes a la API de Primus
    para obtener datos de reservas (bookings).
    """
    def __init__(self):
        """
        Inicializa el cliente de Primus.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        self.base_url = settings.primus_base_url
        self.user = settings.primus_user
        self.password = settings.primus_password
        self.timeout = settings.http_client_timeout_seconds
        self.token = None

    async def login(self):
        """
        Autentica con la API de Primus y almacena el token de acceso.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor, pero almacena el token de acceso en self.token.
            Levanta una excepción si la autenticación falla.
        """
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resp = await client.post(
                f"{self.base_url}/login",
                json={"username": self.user, "password": self.password}
            )
            resp.raise_for_status()
            data = resp.json()
            self.token = data.get("data", {}).get("accessToken")

    async def get_booking_data(self, bol_numbers: list[str]) -> dict:
        """
        Recupera la información de la reserva para una lista de números de BOL de Primus.

        Args:
            bol_numbers (list[str]): Una lista de números de BOL (Bill of Lading) para consultar.

        Returns:
            dict: Un diccionario con los datos de la reserva obtenidos de la API.
                  Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud.
            Levanta una excepción si la solicitud a la API falla.
        """
        if not self.token:
            await self.login()

        headers = {"Authorization": f"Bearer {self.token}"}

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resp = await client.get(
                f"{self.base_url}/book",
                headers=headers,
                params={
                    "BOLNumber[]": bol_numbers
                }
            )
            if resp.status_code == 401:  # Token expirado
                await self.login()
                return await self.get_booking_data(bol_numbers)

            resp.raise_for_status()
            return resp.json()
Functions
__init__()

Inicializa el cliente de Primus.

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/primus_client.py
def __init__(self):
    """
    Inicializa el cliente de Primus.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor.
    """
    self.base_url = settings.primus_base_url
    self.user = settings.primus_user
    self.password = settings.primus_password
    self.timeout = settings.http_client_timeout_seconds
    self.token = None
get_booking_data(bol_numbers) async

Recupera la información de la reserva para una lista de números de BOL de Primus.

Parameters:

Name Type Description Default
bol_numbers list[str]

Una lista de números de BOL (Bill of Lading) para consultar.

required

Returns:

Name Type Description
dict dict

Un diccionario con los datos de la reserva obtenidos de la API. Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud.

dict

Levanta una excepción si la solicitud a la API falla.

Source code in app/services/primus_client.py
async def get_booking_data(self, bol_numbers: list[str]) -> dict:
    """
    Recupera la información de la reserva para una lista de números de BOL de Primus.

    Args:
        bol_numbers (list[str]): Una lista de números de BOL (Bill of Lading) para consultar.

    Returns:
        dict: Un diccionario con los datos de la reserva obtenidos de la API.
              Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud.
        Levanta una excepción si la solicitud a la API falla.
    """
    if not self.token:
        await self.login()

    headers = {"Authorization": f"Bearer {self.token}"}

    async with httpx.AsyncClient(timeout=self.timeout) as client:
        resp = await client.get(
            f"{self.base_url}/book",
            headers=headers,
            params={
                "BOLNumber[]": bol_numbers
            }
        )
        if resp.status_code == 401:  # Token expirado
            await self.login()
            return await self.get_booking_data(bol_numbers)

        resp.raise_for_status()
        return resp.json()
login() async

Autentica con la API de Primus y almacena el token de acceso.

Returns:

Type Description

No retorna ningún valor, pero almacena el token de acceso en self.token.

Levanta una excepción si la autenticación falla.

Source code in app/services/primus_client.py
async def login(self):
    """
    Autentica con la API de Primus y almacena el token de acceso.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor, pero almacena el token de acceso en self.token.
        Levanta una excepción si la autenticación falla.
    """
    async with httpx.AsyncClient(timeout=self.timeout) as client:
        resp = await client.post(
            f"{self.base_url}/login",
            json={"username": self.user, "password": self.password}
        )
        resp.raise_for_status()
        data = resp.json()
        self.token = data.get("data", {}).get("accessToken")

Agente de IA (PDF Processing)

Este conjunto de módulos se encarga de procesar los archivos PDF de los BOLs utilizando un modelo de IA generativa (Gemini) para extraer la información estructurada.

app.services.agent.process_pdf

Este módulo orquesta el procesamiento en paralelo de los PDFs, gestionando la concurrencia y los reintentos.

app.services.agent.process_pdf

Classes

Functions

process_pdfs(agent, prompt, attachments, response_schema) async

Procesa una lista de archivos PDF en paralelo utilizando el LLM.

Crea y ejecuta tareas asíncronas para cada archivo adjunto, gestionando la concurrencia. Recopila los resultados y maneja las excepciones de forma individual para cada tarea, asegurando que el fallo de un PDF no detenga el procesamiento de los demás.

Parameters:

Name Type Description Default
agent GenaiProcessor

La instancia del procesador Genai a utilizar.

required
prompt str

El prompt a enviar al LLM para cada PDF.

required
attachments list[dict]

Una lista de diccionarios, donde cada uno representa un archivo PDF a procesar.

required
response_schema dict

El esquema JSON que el LLM debe seguir para las respuestas.

required

Returns:

Type Description
dict[str, dict]

dict[str, dict]: Un diccionario consolidado con los resultados exitosos, donde cada clave es un número de BOL. Los procesamientos fallidos se registran como errores y no se incluyen en el retorno.

Source code in app/services/agent/process_pdf.py
async def process_pdfs(
    agent: GenaiProcessor,
    prompt: str,
    attachments: list[dict],
    response_schema: dict,
) -> dict[str, dict]:
    """
    Procesa una lista de archivos PDF en paralelo utilizando el LLM.

    Crea y ejecuta tareas asíncronas para cada archivo adjunto, gestionando la
    concurrencia. Recopila los resultados y maneja las excepciones de forma
    individual para cada tarea, asegurando que el fallo de un PDF no detenga
    el procesamiento de los demás.

    Args:
        agent (GenaiProcessor): La instancia del procesador Genai a utilizar.
        prompt (str): El prompt a enviar al LLM para cada PDF.
        attachments (list[dict]): Una lista de diccionarios, donde cada uno
            representa un archivo PDF a procesar.
        response_schema (dict): El esquema JSON que el LLM debe seguir para
            las respuestas.

    Returns:
        dict[str, dict]: Un diccionario consolidado con los resultados exitosos,
            donde cada clave es un número de BOL. Los procesamientos fallidos
            se registran como errores y no se incluyen en el retorno.
    """
    tasks = [
        _call_llm_threadsafe(agent, prompt, att, response_schema)
        for att in attachments
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    final_results: dict[str, dict] = {}
    for i, r in enumerate(results):
        if isinstance(r, Exception):
            logger.error("Falló el procesamiento del PDF %d: %s", i, r)
            continue

        # r tiene la forma {"BOL123": {"parsed": ..., "usage": ...}}
        final_results.update(r)

    return final_results

retry_policy()

Define y retorna una política de reintentos para manejar errores transitorios.

Utiliza una estrategia de backoff exponencial, reintentando hasta 5 veces cuando se producen errores específicos como RateLimitError, APIError de Google, o HTTPStatusError.

Returns:

Name Type Description
Callable Callable

Un decorador de reintentos de la librería tenacity.

Source code in app/services/agent/process_pdf.py
def retry_policy() -> Callable:
    """
    Define y retorna una política de reintentos para manejar errores transitorios.

    Utiliza una estrategia de backoff exponencial, reintentando hasta 5 veces
    cuando se producen errores específicos como `RateLimitError`, `APIError` de Google,
    o `HTTPStatusError`.

    Returns:
        Callable: Un decorador de reintentos de la librería `tenacity`.
    """
    return retry(
        wait=wait_exponential(multiplier=1, min=2, max=30),  # 2s, 4s, 8s... hasta 30s
        stop=stop_after_attempt(5),  # máximo 5 intentos
        retry=retry_if_exception_type((RateLimitError, APIError, HTTPStatusError)),
        reraise=True,
    )

safe_json_load(response, bol_number)

Parsea una cadena JSON de forma segura, manejando posibles errores.

Si la cadena no es un JSON válido, registra un error, envía un correo de notificación y retorna una estructura de respuesta por defecto.

Parameters:

Name Type Description Default
response str

La cadena de texto que se espera que sea un JSON.

required
bol_number str

El número de BOL asociado a la respuesta, para incluirlo en notificaciones de error.

required

Returns:

Name Type Description
dict dict

El objeto JSON parseado, o un diccionario por defecto si ocurre un error.

Source code in app/services/agent/process_pdf.py
def safe_json_load(response: str, bol_number:str) -> dict:
    """
    Parsea una cadena JSON de forma segura, manejando posibles errores.

    Si la cadena no es un JSON válido, registra un error, envía un correo
    de notificación y retorna una estructura de respuesta por defecto.

    Args:
        response (str): La cadena de texto que se espera que sea un JSON.
        bol_number (str): El número de BOL asociado a la respuesta, para
            incluirlo en notificaciones de error.

    Returns:
        dict: El objeto JSON parseado, o un diccionario por defecto si ocurre un error.
    """
    try:
        return json.loads(response)
    except (json.JSONDecodeError, TypeError):
        logger.error("Error parsing JSON response from LLM")
        email_service.send_email(
            dynamic_data={
                "bol_number": bol_number,
                "error_message": f"[ERROR] Respuesta invalida LLM para BOL: {bol_number}"
            }
        )
        return DEFAULT_LLM_RESPONSE.model_dump()

app.services.agent.genai_client

Este cliente encapsula la comunicación con la API del modelo de IA generativa.

app.services.agent.genai_client

Classes

GenaiProcessor

Clase para interactuar con los modelos de IA generativa de Google a través del SDK de Genai.

Esta clase encapsula la configuración y la comunicación con los modelos de Vertex AI, proporcionando métodos para generar contenido a partir de URIs, bytes de archivos, y para manejar sesiones de chat.

Source code in app/services/agent/genai_client.py
class GenaiProcessor:
    """
    Clase para interactuar con los modelos de IA generativa de Google a través del SDK de Genai.

    Esta clase encapsula la configuración y la comunicación con los modelos de Vertex AI,
    proporcionando métodos para generar contenido a partir de URIs, bytes de archivos,
    y para manejar sesiones de chat.
    """
    def __init__(
        self,
        model_name: str,
        system_instruction: str | None = None,
    ) -> None:
        """
        Inicializa el cliente de GenaiProcessor.

        Args:
            model_name (str): El nombre del modelo a utilizar (ej. "gemini-2.5-pro").
            system_instruction (str | None, optional): Instrucción de sistema para
                guiar el comportamiento del modelo. Defaults to None.
        """
        self.client = genai.Client(
            vertexai=True,
            project=os.environ["VERTEX_PROJECT_ID"],
            location=os.environ["VERTEX_LOCATION"],
        )
        self.model_name = model_name
        self.system_instruction = system_instruction
        self._thinking_models = {"gemini-2.5-flash", "gemini-2.5-pro"}

    def _get_safety_settings(self, threshold: HarmBlockThreshold) -> list[SafetySetting]:
        """
        Construye la configuración de seguridad para las solicitudes a la API.

        Args:
            threshold (HarmBlockThreshold): El nivel de umbral para bloquear contenido
                dañino.

        Returns:
            list[SafetySetting]: Una lista de objetos SafetySetting para cada categoría
                de contenido dañino.
        """
        return [
            SafetySetting(category=category, threshold=threshold)
            for category in [
                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
                HarmCategory.HARM_CATEGORY_HARASSMENT,
                HarmCategory.HARM_CATEGORY_HATE_SPEECH,
                HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
            ]
        ]

    def _normalize_files(
        self,
        files: list[dict[str, str]] | None,
        bucket_path: str | None,
        mime_type: str | None,
    ) -> list[dict[str, str]]:
        """
        Normaliza la lista de archivos para la solicitud a la API.

        Valida y formatea la lista de archivos. Si `files` es None, la inicializa.
        Si `bucket_path` se proporciona y la lista de archivos está vacía, crea
        una entrada de archivo para el bucket.

        Args:
            files (list[dict[str, str]] | None): La lista de archivos a normalizar.
                Cada archivo es un dict con 'file_uri' y 'mime_type'.
            bucket_path (str | None): La ruta GCS a un archivo.
            mime_type (str | None): El tipo MIME del archivo en `bucket_path`.

        Returns:
            list[dict[str, str]]: La lista de archivos normalizada.

        Raises:
            ValueError: Si `files` no es una lista de diccionarios o si a un
                archivo le falta 'file_uri' or 'mime_type'.
        """
        if files is None:
            files = []

        if not isinstance(files, list) or not all(isinstance(f, dict) for f in files):
            raise ValueError("`files` must be a list of dictionaries.")

        if not files and bucket_path:
            files = [
                {
                    "file_uri": bucket_path,
                    "mime_type": mime_type or "application/pdf",
                },
            ]

        for f in files:
            if not f.get("file_uri") or not f.get("mime_type"):
                raise ValueError("Each file must include 'file_uri' and 'mime_type'.")

        return files

    def _build_generation_config(
        self,
        *,
        response_schema: dict | None = None,
        response_mime_type: str = "application/json",
        temperature: float = 0.0,
        max_output_tokens: int = 8192,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        top_p: float = 1.0,
        seed: int = 20180507,
        safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
        thinking_budget: int = 0,
    ) -> GenerateContentConfig:
        """
        Construye el objeto de configuración para la generación de contenido.

        Args:
            response_schema (dict | None, optional): El esquema para la respuesta. Defaults to None.
            response_mime_type (str, optional): El tipo MIME de la respuesta. Defaults to "application/json".
            temperature (float, optional): La temperatura de la generación. Defaults to 0.0.
            max_output_tokens (int, optional): Máximo de tokens en la respuesta. Defaults to 8192.
            presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
            frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
            top_p (float, optional): Top-p para el muestreo. Defaults to 1.0.
            seed (int, optional): Semilla para la reproducibilidad. Defaults to 20180507.
            safety_threshold (HarmBlockThreshold, optional): Umbral de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
            thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

        Returns:
            GenerateContentConfig: El objeto de configuración de generación.
        """
        config = {
            "system_instruction": self.system_instruction,
            "response_schema": response_schema,
            "response_mime_type": response_mime_type,
            "temperature": temperature,
            "max_output_tokens": max_output_tokens,
            "presence_penalty": presence_penalty,
            "frequency_penalty": frequency_penalty,
            "top_p": top_p,
            "seed": seed,
            "safety_settings": self._get_safety_settings(safety_threshold),
        }

        if self.model_name in self._thinking_models:
            config["thinking_config"] = ThinkingConfig(
                thinking_budget=thinking_budget,
            )

        return GenerateContentConfig(**config)

    def extract_usage_metadata(self, response: object) -> dict:
        """
        Extrae los metadatos de uso de la respuesta de la API.

        Args:
            response (object): El objeto de respuesta de la API de Genai.

        Returns:
            dict: Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica).
        """
        usage = {
            "prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", None),
            "response_tokens": getattr(response.usage_metadata, "candidates_token_count", None),
            "total_tokens": getattr(response.usage_metadata, "total_token_count", None),
        }

        if self.model_name in self._thinking_models:
            usage["thoughts_tokens"] = getattr(
                response.usage_metadata, "thoughts_token_count", None,
            )

        return usage

    def generate_from_uri(
        self,
        prompt: str,
        *,
        files: list[dict[str, str]] | None = None,
        bucket_path: str | None = None,
        mime_type: str | None = None,
        response_schema: dict | None = None,
        response_mime_type: str = "application/json",
        temperature: float = 0.0,
        max_output_tokens: int = 8192,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        top_p: float = 1.0,
        seed: int = 20180507,
        safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
        thinking_budget: int = 0,
    ) -> dict[str, object]:
        """
        Genera contenido a partir de un prompt y archivos especificados por URI (GCS).

        Args:
            prompt (str): El prompt de texto para el modelo.
            files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
            bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
            mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
            response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
            response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
            temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
            max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
            presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
            frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
            top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
            seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
            safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
            thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

        Returns:
            dict[str, object]: Un diccionario con la respuesta completa de la API,
                los metadatos de uso y la respuesta parseada.

        Raises:
            VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
        """
        files = self._normalize_files(files, bucket_path, mime_type)

        contents = [
            Part.from_uri(
                file_uri=f["file_uri"],
                mime_type=f["mime_type"],
            ) for f in files
        ] + [
            Part.from_text(text=prompt),
        ]

        config = self._build_generation_config(
            response_schema=response_schema,
            response_mime_type=response_mime_type,
            temperature=temperature,
            max_output_tokens=max_output_tokens,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
            top_p=top_p,
            seed=seed,
            safety_threshold=safety_threshold,
            thinking_budget=thinking_budget
        )

        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=contents,
                config=config,
            )
        except APIError as e:
            raise VertexAIExceptionError(
                original_exception=e,
            ) from e

        usage = self.extract_usage_metadata(response)

        return {
            "response": response,
            "usage": usage,
            "parsed": self.parse_response(response),
        }

    def generate_from_bytes(
        self,
        prompt: str,
        *,
        pdf_bytes: bytes,
        mime_type: str = "application/pdf",
        response_schema: dict | None = None,
        response_mime_type: str = "application/json",
        temperature: float = 0.0,
        max_output_tokens: int = 8192,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        top_p: float = 1.0,
        seed: int = 20180507,
        safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
        thinking_budget: int = 0,
    ) -> dict[str, object]:
        """
        Genera contenido a partir de un prompt y los bytes de un archivo PDF.

        Args:
            prompt (str): El prompt de texto para el modelo.
            pdf_bytes (bytes): Los bytes del archivo PDF a procesar.
            mime_type (str, optional): El tipo MIME del archivo. Defaults to "application/pdf".
            response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
            response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
            temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
            max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
            presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
            frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
            top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
            seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
            safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
            thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

        Returns:
            dict[str, object]: Un diccionario con la respuesta completa de la API,
                los metadatos de uso y la respuesta parseada.
        """

        contents = [
            Part.from_bytes(
                data=pdf_bytes,
                mime_type = mime_type,
            ),
            Part.from_text(
                text=prompt,
            ),
        ]

        config = {
            "system_instruction": self.system_instruction,
            "response_mime_type": response_mime_type,
            "response_schema": response_schema,
            "temperature": temperature,
            "max_output_tokens": max_output_tokens,
            "presence_penalty": presence_penalty,
            "frequency_penalty": frequency_penalty,
            "top_p": top_p,
            "seed": seed,
            "safety_settings": self._get_safety_settings(
                safety_threshold,
            ),
        }

        if self.model_name in self._thinking_models:
            config["thinking_config"] = ThinkingConfig(
                thinking_budget=thinking_budget,
            )


        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,
            config=GenerateContentConfig(**config),
        )

        usage = self.extract_usage_metadata(response)

        return {
            "response": response,
            "usage": usage,
            "parsed": self.parse_response(response),
        }

    def get_chat_session(
        self,
        *,
        temperature: float = 0.0,
        max_output_tokens: int = 8192,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        top_p: float = 1.0,
        seed: int = 20180507,
        safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
        thinking_budget: int = 0,
    ) -> object:
        """
        Crea y retorna una nueva sesión de chat con la configuración especificada.

        Args:
            temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
            max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
            presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
            frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
            top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
            seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
            safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
            thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

        Returns:
            object: Un objeto de sesión de chat del SDK de Genai.
        """
        config = self._build_generation_config(
            temperature=temperature,
            max_output_tokens=max_output_tokens,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
            top_p=top_p,
            seed=seed,
            safety_threshold=safety_threshold,
            thinking_budget=thinking_budget,
        )

        return self.client.chats.create(
            model=self.model_name,
            config=config,
        )

    def generate_chat(
        self,
        chat: object,
        prompt: str,
        *,
        files: list[dict[str, str]] | None = None,
        bucket_path: str | None = None,
        mime_type: str | None = None,
        response_schema: dict | None = None,
        response_mime_type: str = "application/json",
        temperature: float = 0.0,
        max_output_tokens: int = 8192,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        top_p: float = 1.0,
        seed: int = 20180507,
        safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
        thinking_budget: int = 0,
    ) -> dict[str, object]:
        """
        Envía un mensaje a una sesión de chat existente y obtiene una respuesta.

        Args:
            chat (object): La sesión de chat activa.
            prompt (str): El prompt de texto para el modelo.
            files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
            bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
            mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
            response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
            response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
            temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
            max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
            presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
            frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
            top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
            seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
            safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
            thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

        Returns:
            dict[str, object]: Un diccionario con la respuesta completa de la API,
                los metadatos de uso y la respuesta parseada.

        Raises:
            VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
        """
        files = self._normalize_files(files, bucket_path, mime_type)

        parts = [
            Part.from_text(text=prompt),
        ] + [
            Part.from_uri(
                file_uri=f["file_uri"],
                mime_type=f["mime_type"],
            ) for f in files
        ] 

        config = self._build_generation_config(
            response_schema=response_schema,
            response_mime_type=response_mime_type,
            temperature=temperature,
            max_output_tokens=max_output_tokens,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
            top_p=top_p,
            seed=seed,
            safety_threshold=safety_threshold,
            thinking_budget=thinking_budget,
        )

        try:
            response = chat.send_message(message=parts,
                                         config=config)
        except APIError as e:
            raise VertexAIExceptionError(
                original_exception=e,
            ) from e

        usage = self.extract_usage_metadata(response)

        return {
                "response": response,
                "usage": usage,
                "parsed": self.parse_response(response),
            }

    def parse_response(self, response: object) -> dict:
        """
        Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.

        Args:
            response (object): El objeto de respuesta de la API de Genai.

        Returns:
            dict: Un diccionario con dos claves, 'thoughts' y 'answers', cada una
                conteniendo una lista de las partes correspondientes de la respuesta.
        """
        parsed = {
            "thoughts": [],
            "answers": [],
        }
        if not response.candidates or not response.candidates[0].content.parts:
            return parsed

        for part in response.candidates[0].content.parts:
            if part.thought:
                parsed["thoughts"].append(part.text)
            else:
                parsed["answers"].append(part.text)
        return parsed
Functions
__init__(model_name, system_instruction=None)

Inicializa el cliente de GenaiProcessor.

Parameters:

Name Type Description Default
model_name str

El nombre del modelo a utilizar (ej. "gemini-2.5-pro").

required
system_instruction str | None

Instrucción de sistema para guiar el comportamiento del modelo. Defaults to None.

None
Source code in app/services/agent/genai_client.py
def __init__(
    self,
    model_name: str,
    system_instruction: str | None = None,
) -> None:
    """
    Inicializa el cliente de GenaiProcessor.

    Args:
        model_name (str): El nombre del modelo a utilizar (ej. "gemini-2.5-pro").
        system_instruction (str | None, optional): Instrucción de sistema para
            guiar el comportamiento del modelo. Defaults to None.
    """
    self.client = genai.Client(
        vertexai=True,
        project=os.environ["VERTEX_PROJECT_ID"],
        location=os.environ["VERTEX_LOCATION"],
    )
    self.model_name = model_name
    self.system_instruction = system_instruction
    self._thinking_models = {"gemini-2.5-flash", "gemini-2.5-pro"}
extract_usage_metadata(response)

Extrae los metadatos de uso de la respuesta de la API.

Parameters:

Name Type Description Default
response object

El objeto de respuesta de la API de Genai.

required

Returns:

Name Type Description
dict dict

Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica).

Source code in app/services/agent/genai_client.py
def extract_usage_metadata(self, response: object) -> dict:
    """
    Extrae los metadatos de uso de la respuesta de la API.

    Args:
        response (object): El objeto de respuesta de la API de Genai.

    Returns:
        dict: Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica).
    """
    usage = {
        "prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", None),
        "response_tokens": getattr(response.usage_metadata, "candidates_token_count", None),
        "total_tokens": getattr(response.usage_metadata, "total_token_count", None),
    }

    if self.model_name in self._thinking_models:
        usage["thoughts_tokens"] = getattr(
            response.usage_metadata, "thoughts_token_count", None,
        )

    return usage
generate_chat(chat, prompt, *, files=None, bucket_path=None, mime_type=None, response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)

Envía un mensaje a una sesión de chat existente y obtiene una respuesta.

Parameters:

Name Type Description Default
chat object

La sesión de chat activa.

required
prompt str

El prompt de texto para el modelo.

required
files list[dict[str, str]] | None

Lista de archivos en GCS. Defaults to None.

None
bucket_path str | None

Ruta a un único archivo en GCS. Defaults to None.

None
mime_type str | None

Tipo MIME para bucket_path. Defaults to None.

None
response_schema dict | None

Esquema de respuesta esperado. Defaults to None.

None
response_mime_type str

Tipo MIME de la respuesta. Defaults to "application/json".

'application/json'
temperature float

Controla la aleatoriedad. Defaults to 0.0.

0.0
max_output_tokens int

Límite de tokens de salida. Defaults to 8192.

8192
presence_penalty float

Penalización por presencia. Defaults to 0.0.

0.0
frequency_penalty float

Penalización por frecuencia. Defaults to 0.0.

0.0
top_p float

Muestreo nucleus. Defaults to 1.0.

1.0
seed int

Semilla para resultados deterministas. Defaults to 20180507.

20180507
safety_threshold HarmBlockThreshold

Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.

BLOCK_NONE
thinking_budget int

Presupuesto para 'thinking'. Defaults to 0.

0

Returns:

Type Description
dict[str, object]

dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada.

Raises:

Type Description
VertexAIExceptionError

Si ocurre un error en la llamada a la API de Genai.

Source code in app/services/agent/genai_client.py
def generate_chat(
    self,
    chat: object,
    prompt: str,
    *,
    files: list[dict[str, str]] | None = None,
    bucket_path: str | None = None,
    mime_type: str | None = None,
    response_schema: dict | None = None,
    response_mime_type: str = "application/json",
    temperature: float = 0.0,
    max_output_tokens: int = 8192,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
    top_p: float = 1.0,
    seed: int = 20180507,
    safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
    thinking_budget: int = 0,
) -> dict[str, object]:
    """
    Envía un mensaje a una sesión de chat existente y obtiene una respuesta.

    Args:
        chat (object): La sesión de chat activa.
        prompt (str): El prompt de texto para el modelo.
        files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
        bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
        mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
        response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
        response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
        temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
        max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
        presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
        frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
        top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
        seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
        safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
        thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

    Returns:
        dict[str, object]: Un diccionario con la respuesta completa de la API,
            los metadatos de uso y la respuesta parseada.

    Raises:
        VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
    """
    files = self._normalize_files(files, bucket_path, mime_type)

    parts = [
        Part.from_text(text=prompt),
    ] + [
        Part.from_uri(
            file_uri=f["file_uri"],
            mime_type=f["mime_type"],
        ) for f in files
    ] 

    config = self._build_generation_config(
        response_schema=response_schema,
        response_mime_type=response_mime_type,
        temperature=temperature,
        max_output_tokens=max_output_tokens,
        presence_penalty=presence_penalty,
        frequency_penalty=frequency_penalty,
        top_p=top_p,
        seed=seed,
        safety_threshold=safety_threshold,
        thinking_budget=thinking_budget,
    )

    try:
        response = chat.send_message(message=parts,
                                     config=config)
    except APIError as e:
        raise VertexAIExceptionError(
            original_exception=e,
        ) from e

    usage = self.extract_usage_metadata(response)

    return {
            "response": response,
            "usage": usage,
            "parsed": self.parse_response(response),
        }
generate_from_bytes(prompt, *, pdf_bytes, mime_type='application/pdf', response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)

Genera contenido a partir de un prompt y los bytes de un archivo PDF.

Parameters:

Name Type Description Default
prompt str

El prompt de texto para el modelo.

required
pdf_bytes bytes

Los bytes del archivo PDF a procesar.

required
mime_type str

El tipo MIME del archivo. Defaults to "application/pdf".

'application/pdf'
response_schema dict | None

Esquema de respuesta esperado. Defaults to None.

None
response_mime_type str

Tipo MIME de la respuesta. Defaults to "application/json".

'application/json'
temperature float

Controla la aleatoriedad. Defaults to 0.0.

0.0
max_output_tokens int

Límite de tokens de salida. Defaults to 8192.

8192
presence_penalty float

Penalización por presencia. Defaults to 0.0.

0.0
frequency_penalty float

Penalización por frecuencia. Defaults to 0.0.

0.0
top_p float

Muestreo nucleus. Defaults to 1.0.

1.0
seed int

Semilla para resultados deterministas. Defaults to 20180507.

20180507
safety_threshold HarmBlockThreshold

Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.

BLOCK_NONE
thinking_budget int

Presupuesto para 'thinking'. Defaults to 0.

0

Returns:

Type Description
dict[str, object]

dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada.

Source code in app/services/agent/genai_client.py
def generate_from_bytes(
    self,
    prompt: str,
    *,
    pdf_bytes: bytes,
    mime_type: str = "application/pdf",
    response_schema: dict | None = None,
    response_mime_type: str = "application/json",
    temperature: float = 0.0,
    max_output_tokens: int = 8192,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
    top_p: float = 1.0,
    seed: int = 20180507,
    safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
    thinking_budget: int = 0,
) -> dict[str, object]:
    """
    Genera contenido a partir de un prompt y los bytes de un archivo PDF.

    Args:
        prompt (str): El prompt de texto para el modelo.
        pdf_bytes (bytes): Los bytes del archivo PDF a procesar.
        mime_type (str, optional): El tipo MIME del archivo. Defaults to "application/pdf".
        response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
        response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
        temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
        max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
        presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
        frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
        top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
        seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
        safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
        thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

    Returns:
        dict[str, object]: Un diccionario con la respuesta completa de la API,
            los metadatos de uso y la respuesta parseada.
    """

    contents = [
        Part.from_bytes(
            data=pdf_bytes,
            mime_type = mime_type,
        ),
        Part.from_text(
            text=prompt,
        ),
    ]

    config = {
        "system_instruction": self.system_instruction,
        "response_mime_type": response_mime_type,
        "response_schema": response_schema,
        "temperature": temperature,
        "max_output_tokens": max_output_tokens,
        "presence_penalty": presence_penalty,
        "frequency_penalty": frequency_penalty,
        "top_p": top_p,
        "seed": seed,
        "safety_settings": self._get_safety_settings(
            safety_threshold,
        ),
    }

    if self.model_name in self._thinking_models:
        config["thinking_config"] = ThinkingConfig(
            thinking_budget=thinking_budget,
        )


    response = self.client.models.generate_content(
        model=self.model_name,
        contents=contents,
        config=GenerateContentConfig(**config),
    )

    usage = self.extract_usage_metadata(response)

    return {
        "response": response,
        "usage": usage,
        "parsed": self.parse_response(response),
    }
generate_from_uri(prompt, *, files=None, bucket_path=None, mime_type=None, response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)

Genera contenido a partir de un prompt y archivos especificados por URI (GCS).

Parameters:

Name Type Description Default
prompt str

El prompt de texto para el modelo.

required
files list[dict[str, str]] | None

Lista de archivos en GCS. Defaults to None.

None
bucket_path str | None

Ruta a un único archivo en GCS. Defaults to None.

None
mime_type str | None

Tipo MIME para bucket_path. Defaults to None.

None
response_schema dict | None

Esquema de respuesta esperado. Defaults to None.

None
response_mime_type str

Tipo MIME de la respuesta. Defaults to "application/json".

'application/json'
temperature float

Controla la aleatoriedad. Defaults to 0.0.

0.0
max_output_tokens int

Límite de tokens de salida. Defaults to 8192.

8192
presence_penalty float

Penalización por presencia. Defaults to 0.0.

0.0
frequency_penalty float

Penalización por frecuencia. Defaults to 0.0.

0.0
top_p float

Muestreo nucleus. Defaults to 1.0.

1.0
seed int

Semilla para resultados deterministas. Defaults to 20180507.

20180507
safety_threshold HarmBlockThreshold

Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.

BLOCK_NONE
thinking_budget int

Presupuesto para 'thinking'. Defaults to 0.

0

Returns:

Type Description
dict[str, object]

dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada.

Raises:

Type Description
VertexAIExceptionError

Si ocurre un error en la llamada a la API de Genai.

Source code in app/services/agent/genai_client.py
def generate_from_uri(
    self,
    prompt: str,
    *,
    files: list[dict[str, str]] | None = None,
    bucket_path: str | None = None,
    mime_type: str | None = None,
    response_schema: dict | None = None,
    response_mime_type: str = "application/json",
    temperature: float = 0.0,
    max_output_tokens: int = 8192,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
    top_p: float = 1.0,
    seed: int = 20180507,
    safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
    thinking_budget: int = 0,
) -> dict[str, object]:
    """
    Genera contenido a partir de un prompt y archivos especificados por URI (GCS).

    Args:
        prompt (str): El prompt de texto para el modelo.
        files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
        bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
        mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
        response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
        response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
        temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
        max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
        presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
        frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
        top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
        seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
        safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
        thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

    Returns:
        dict[str, object]: Un diccionario con la respuesta completa de la API,
            los metadatos de uso y la respuesta parseada.

    Raises:
        VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
    """
    files = self._normalize_files(files, bucket_path, mime_type)

    contents = [
        Part.from_uri(
            file_uri=f["file_uri"],
            mime_type=f["mime_type"],
        ) for f in files
    ] + [
        Part.from_text(text=prompt),
    ]

    config = self._build_generation_config(
        response_schema=response_schema,
        response_mime_type=response_mime_type,
        temperature=temperature,
        max_output_tokens=max_output_tokens,
        presence_penalty=presence_penalty,
        frequency_penalty=frequency_penalty,
        top_p=top_p,
        seed=seed,
        safety_threshold=safety_threshold,
        thinking_budget=thinking_budget
    )

    try:
        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,
            config=config,
        )
    except APIError as e:
        raise VertexAIExceptionError(
            original_exception=e,
        ) from e

    usage = self.extract_usage_metadata(response)

    return {
        "response": response,
        "usage": usage,
        "parsed": self.parse_response(response),
    }
get_chat_session(*, temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)

Crea y retorna una nueva sesión de chat con la configuración especificada.

Parameters:

Name Type Description Default
temperature float

Controla la aleatoriedad. Defaults to 0.0.

0.0
max_output_tokens int

Límite de tokens de salida. Defaults to 8192.

8192
presence_penalty float

Penalización por presencia. Defaults to 0.0.

0.0
frequency_penalty float

Penalización por frecuencia. Defaults to 0.0.

0.0
top_p float

Muestreo nucleus. Defaults to 1.0.

1.0
seed int

Semilla para resultados deterministas. Defaults to 20180507.

20180507
safety_threshold HarmBlockThreshold

Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.

BLOCK_NONE
thinking_budget int

Presupuesto para 'thinking'. Defaults to 0.

0

Returns:

Name Type Description
object object

Un objeto de sesión de chat del SDK de Genai.

Source code in app/services/agent/genai_client.py
def get_chat_session(
    self,
    *,
    temperature: float = 0.0,
    max_output_tokens: int = 8192,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
    top_p: float = 1.0,
    seed: int = 20180507,
    safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
    thinking_budget: int = 0,
) -> object:
    """
    Crea y retorna una nueva sesión de chat con la configuración especificada.

    Args:
        temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
        max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
        presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
        frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
        top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
        seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
        safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
        thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.

    Returns:
        object: Un objeto de sesión de chat del SDK de Genai.
    """
    config = self._build_generation_config(
        temperature=temperature,
        max_output_tokens=max_output_tokens,
        presence_penalty=presence_penalty,
        frequency_penalty=frequency_penalty,
        top_p=top_p,
        seed=seed,
        safety_threshold=safety_threshold,
        thinking_budget=thinking_budget,
    )

    return self.client.chats.create(
        model=self.model_name,
        config=config,
    )
parse_response(response)

Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.

Parameters:

Name Type Description Default
response object

El objeto de respuesta de la API de Genai.

required

Returns:

Name Type Description
dict dict

Un diccionario con dos claves, 'thoughts' y 'answers', cada una conteniendo una lista de las partes correspondientes de la respuesta.

Source code in app/services/agent/genai_client.py
def parse_response(self, response: object) -> dict:
    """
    Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.

    Args:
        response (object): El objeto de respuesta de la API de Genai.

    Returns:
        dict: Un diccionario con dos claves, 'thoughts' y 'answers', cada una
            conteniendo una lista de las partes correspondientes de la respuesta.
    """
    parsed = {
        "thoughts": [],
        "answers": [],
    }
    if not response.candidates or not response.candidates[0].content.parts:
        return parsed

    for part in response.candidates[0].content.parts:
        if part.thought:
            parsed["thoughts"].append(part.text)
        else:
            parsed["answers"].append(part.text)
    return parsed

Mapper

Este módulo contiene funciones de utilidad para mapear y transformar datos entre los diferentes formatos y sistemas (Primus, LLM, CRM).

app.services.mapper.mapper

app.services.mapper.mapper

Functions

build_account_obj(primus_result, account_type)

Construye un objeto Accounts para un shipper o consignee.

A partir de un resultado de Primus, esta función crea un objeto Accounts que representa a un shipper (remitente) o a un consignee (destinatario), truncando los campos de nombre y calle a 40 caracteres si es necesario.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
account_type str

El tipo de cuenta a construir, debe ser 'shipper' o 'consignee'.

required

Returns:

Type Description
Optional[Accounts]

Optional[Accounts]: Un objeto Accounts con la información de la cuenta. Retorna None si el account_type no se encuentra en primus_result.

Source code in app/services/mapper/mapper.py
def build_account_obj(primus_result: dict, account_type: str) -> Optional[Accounts]:
    """
    Construye un objeto Accounts para un shipper o consignee.

    A partir de un resultado de Primus, esta función crea un objeto `Accounts`
    que representa a un shipper (remitente) o a un consignee (destinatario),
    truncando los campos de nombre y calle a 40 caracteres si es necesario.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        account_type (str): El tipo de cuenta a construir, debe ser 'shipper' o 'consignee'.

    Returns:
        Optional[Accounts]: Un objeto `Accounts` con la información de la cuenta.
            Retorna `None` si el `account_type` no se encuentra en `primus_result`.
    """
    if account_type not in primus_result:
        return None

    account_data = primus_result[account_type]
    country_code_2letter = ""
    country_code_3letter = account_data.get("country", "")
    account_name = account_data.get("name", "")
    street = account_data.get("address1", "")

    if country_code_3letter == "USA":
        country_code_2letter = "US"
    elif country_code_3letter == "CAN":
        country_code_2letter = "CA"
    elif country_code_3letter == "MEX":
        country_code_2letter = "MX"

    if len(account_name) > 40:
        account_name = account_name[:40]  # Truncar a 40 caracteres si es necesario

    if len(street) > 40: 
        street = street[:40]

    return Accounts(
        name=account_name,
        street=street,
        city=account_data.get("city", ""),
        postal_code=account_data.get("zipCode", ""),
        country_code=country_code_2letter,
        state_code=account_data.get("state", ""),
        phone=account_data.get("contactPhone", ""),
    )

build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)

Construye un objeto CarrierQuote a partir de los datos de Primus y del LLM.

Crea un objeto CarrierQuote que representa la cotización de un transportista, determinando el tipo de contrato y extrayendo los detalles relevantes de la cotización.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
llm_entry dict

La entrada correspondiente del procesamiento del LLM.

required
load_id str

El ID de la carga (load) a la que se asocia la cotización.

required
carrier_service dict

Un diccionario con la información del servicio del transportista, incluyendo su ID.

required

Returns:

Name Type Description
CarrierQuote CarrierQuote

Un objeto CarrierQuote con los detalles de la cotización.

Source code in app/services/mapper/mapper.py
def build_carrier_quote_object(primus_result: dict, llm_entry:dict, load_id: str, carrier_service: dict) -> CarrierQuote:
    """
    Construye un objeto CarrierQuote a partir de los datos de Primus y del LLM.

    Crea un objeto `CarrierQuote` que representa la cotización de un transportista,
    determinando el tipo de contrato y extrayendo los detalles relevantes de la
    cotización.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        llm_entry (dict): La entrada correspondiente del procesamiento del LLM.
        load_id (str): El ID de la carga (load) a la que se asocia la cotización.
        carrier_service (dict): Un diccionario con la información del servicio del
            transportista, incluyendo su ID.

    Returns:
        CarrierQuote: Un objeto `CarrierQuote` con los detalles de la cotización.
    """
    vendor = primus_result.get("vendor", {})
    bill_to_third_party = llm_entry.get('parsed',{}).get("load",{}).get("bill_to_third_party")

    if not bill_to_third_party:
        contract = ""
    elif "BASINGEN" in bill_to_third_party:
        contract = "BA"
    else:
        contract = "GLT"

    # service_level = vendor.get("serviceLevel")
    # mode = vendor.get("mode")

    return CarrierQuote(
        carrier_service=carrier_service.get("id",""),
        service_class=f'{vendor.get("serviceLevel", "")}', # Sin el service_level, solo el tipo. 
        line_haul=vendor.get("cost"),
        quote_contract_id=f'{vendor.get("SCAC")}_{contract}:{vendor.get("quoteNumber")}',
        load_id=load_id,
        vendor_id=carrier_service.get("carrier",""),
        estimated_transit_time_days=vendor.get("transitDays"),
    )

build_contact_obj(primus_result, account_type, account_id)

Construye un objeto Contact para un shipper o consignee.

Extrae la información de contacto de un shipper o consignee desde un resultado de Primus y la utiliza para crear un objeto Contact.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
account_type str

El tipo de cuenta ('shipper' o 'consignee') de la cual extraer el contacto.

required
account_id str

El ID de la cuenta asociada a este contacto.

required

Returns:

Type Description
Optional[Contact]

Optional[Contact]: Un objeto Contact con la información del contacto. Retorna None si el account_type no está en primus_result o si no se encuentra un nombre de contacto.

Source code in app/services/mapper/mapper.py
def build_contact_obj(primus_result: dict, account_type: str, account_id: str) -> Optional[Contact]:
    """
    Construye un objeto Contact para un shipper o consignee.

    Extrae la información de contacto de un shipper o consignee desde un resultado
    de Primus y la utiliza para crear un objeto `Contact`.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        account_type (str): El tipo de cuenta ('shipper' o 'consignee') de la cual
            extraer el contacto.
        account_id (str): El ID de la cuenta asociada a este contacto.

    Returns:
        Optional[Contact]: Un objeto `Contact` con la información del contacto.
            Retorna `None` si el `account_type` no está en `primus_result` o si
            no se encuentra un nombre de contacto.
    """
    if account_type not in primus_result:
        return None

    account_data = primus_result[account_type]
    full_name = account_data.get("contact", "")
    if not full_name:
        return None

    name_parts = full_name.split()
    first_name = name_parts[0]

    email_raw = account_data.get("email", "")
    first_email = email_raw.split(";")[0].strip() if email_raw else ""

    return Contact(
        first_name=first_name,
        last_name="Shipping",
        phone=account_data.get("phone", ""),
        email=first_email,
        account_id=account_id
    )

build_customer_quote_object(primus_result, load_id)

Construye un objeto CustomerQuote a partir de los datos de Primus.

Crea un objeto CustomerQuote que representa la cotización para el cliente, extrayendo la información de contabilidad y la fecha de la cotización.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
load_id str

El ID de la carga (load) a la que se asocia la cotización.

required

Returns:

Name Type Description
CustomerQuote CustomerQuote

Un objeto CustomerQuote con los detalles de la cotización del cliente.

Source code in app/services/mapper/mapper.py
def build_customer_quote_object(primus_result: dict, load_id: str) -> CustomerQuote:
    """
    Construye un objeto CustomerQuote a partir de los datos de Primus.

    Crea un objeto `CustomerQuote` que representa la cotización para el cliente,
    extrayendo la información de contabilidad y la fecha de la cotización.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        load_id (str): El ID de la carga (load) a la que se asocia la cotización.

    Returns:
        CustomerQuote: Un objeto `CustomerQuote` con los detalles de la cotización
            del cliente.
    """
    accounting = primus_result.get("accountingInformation", {})
    tracking = primus_result.get("trackingInformation", {})
    net_line_haul = str(accounting.get("customerQuoteAmount"))
    quote_date = tracking.get("bookDate")
    dt = datetime.strptime(quote_date, "%Y-%m-%d %H:%M:%S")

    return CustomerQuote(
        load_id=load_id,
        net_line_haul=float(net_line_haul.replace("*", "").strip()),
        pricing="contract",
        quote_date=str(dt.date())
    )

build_final_loads(primus_data, llm_results, accounts_results, contacts_results, linear_feet_by_bol, customer_mode_id, accessorials_results)

Construye una lista de objetos FinalLoad a partir de datos procesados.

Esta función integra datos de múltiples fuentes (Primus, LLM, cuentas, contactos, etc.) para construir una lista completa de objetos FinalLoad, cada uno representando una carga final lista para ser procesada.

Parameters:

Name Type Description Default
primus_data list[dict]

Lista de datos de bookings de Primus.

required
llm_results dict[str, dict]

Resultados del procesamiento del LLM.

required
accounts_results dict

Resultados de la creación de cuentas.

required
contacts_results dict

Resultados de la creación de contactos.

required
linear_feet_by_bol dict

Datos de 'linear feet' calculados por BOL.

required
customer_mode_id dict

Mapeo de customer ID y mode ID por BOL.

required
accessorials_results dict

'Accessorials' mapeados y consolidados por BOL.

required

Returns:

Type Description
List[FinalLoad]

List[FinalLoad]: Una lista de objetos FinalLoad completamente construidos.

Source code in app/services/mapper/mapper.py
def build_final_loads(
    primus_data: list[dict],
    llm_results: dict[str,dict],
    accounts_results: dict,
    contacts_results: dict,
    linear_feet_by_bol: dict,
    customer_mode_id: dict,
    accessorials_results: dict,
) -> List[FinalLoad]:
    """
    Construye una lista de objetos FinalLoad a partir de datos procesados.

    Esta función integra datos de múltiples fuentes (Primus, LLM, cuentas, contactos, etc.)
    para construir una lista completa de objetos `FinalLoad`, cada uno representando
    una carga final lista para ser procesada.

    Args:
        primus_data (list[dict]): Lista de datos de bookings de Primus.
        llm_results (dict[str, dict]): Resultados del procesamiento del LLM.
        accounts_results (dict): Resultados de la creación de cuentas.
        contacts_results (dict): Resultados de la creación de contactos.
        linear_feet_by_bol (dict): Datos de 'linear feet' calculados por BOL.
        customer_mode_id (dict): Mapeo de customer ID y mode ID por BOL.
        accessorials_results (dict): 'Accessorials' mapeados y consolidados por BOL.

    Returns:
        List[FinalLoad]: Una lista de objetos `FinalLoad` completamente construidos.
    """
    results: list[FinalLoad] = []

    for bol in primus_data:
        bol_code = str(bol["BOLNumber"])

        # === load ===
        load = Load(
            total_weight=float(bol.get("totalWeight")),
            customer_id=customer_mode_id.get(bol_code, {}).get("customer_id"),
            mode_id=customer_mode_id.get(bol_code, {}).get("mode_id"),
            linear_feet=float(linear_feet_by_bol.get(bol_code, {}).get("total_linear_feet")),
            order_date=bol.get("trackingInformation", {}).get("pickupDateEstimated"),
        )

        # === stops ===
        stops: list[Stop] = []

        # pickup (stop 1)
        pickup_info = bol.get('pickupInformation', {})
        pickup_time_from = pickup_info.get('timeFrom', '')
        pickup_time_to = pickup_info.get('timeTo', '')
        if not pickup_time_from or not pickup_time_to:
            pickup_hours = "08:00-17:00"
        else:
            pickup_hours = f"{pickup_time_from[:5]}-{pickup_time_to[:5]}"

        stops.append(
            Stop(
                name="Stop 1",
                number=1,
                location_id=accounts_results.get("shipper",{}).get(bol_code, {}).get("id"),
                contact_info_id=contacts_results.get("shipper",{}).get(bol_code, {}).get("id"),
                shipping_receiving_hours=pickup_hours,
                expected_date=bol.get("trackingInformation", {}).get("pickupDateEstimated"),
                references=bol.get("shipper", {}).get("referenceNumber"),
                instructions=bol.get("BOLInstructions"),
                pickup_delivery_number=bol.get("vendor").get("pickupNumber",""),
                is_pickup=True,
                state_province=bol.get("shipper", {}).get("state"),
                postal_code=bol.get("shipper", {}).get("zipCode"),
                country=bol.get("shipper", {}).get("country"),
                city=bol.get("shipper", {}).get("city")
            )
        )

        # delivery (stop 2)
        delivery_info = bol.get('deliveryInformation', {})
        delivery_time_from = delivery_info.get('timeFrom', '')
        delivery_time_to = delivery_info.get('timeTo', '')
        if not delivery_time_from or not delivery_time_to:
            delivery_hours = "08:00-17:00"
        else:
            delivery_hours = f"{delivery_time_from[:5]}-{delivery_time_to[:5]}"

        stops.append(
            Stop(
                name="Stop 2",
                number=2,
                location_id=accounts_results.get("consignee",{}).get(bol_code, {}).get("id"),
                contact_info_id=contacts_results.get("consignee",{}).get(bol_code, {}).get("id"),
                shipping_receiving_hours=delivery_hours,
                expected_date=bol.get("trackingInformation", {}).get("deliveryDateEstimated"),
                references=bol.get("consignee", {}).get("referenceNumber"),
                instructions=bol.get("BOLInstructions"),
                is_dropoff=True,
                state_province=bol.get("consignee", {}).get("state"),
                postal_code=bol.get("consignee", {}).get("zipCode"),
                country=bol.get("consignee", {}).get("country"),
                city=bol.get("consignee", {}).get("city"),
            )
        )

        # === line items ===
        line_items: list[LineItem] = []
        for idx, fi in enumerate(bol.get("freightInfo", [])):
            parsed_llm = llm_results.get(bol_code,{}).get("parsed",{})
            llm_line_item = get_llm_line_item(parsed_llm, idx)

            line_items.append(
                LineItem(
                    name=f"Line Item {idx+1}",
                    description=fi.get("commodity"),
                    handling_unit_count=fi.get("qty"),
                    handling_units=MAPPER_HU_DATA.get(fi.get("dimType")),
                    height=float(fi.get("height")),
                    length=float(fi.get("length")),
                    # linear_feet=linear_feet_by_bol.get(bol_code),
                    nmfc_class=str(fi.get("class")),
                    weight=float(fi.get("weight"))*int(fi.get("qty")),
                    width=float(fi.get("width")),
                    hazardous_materials=fi.get("hazmat", False),
                    hazmat_contact=contacts_results.get("hazmat",{}).get(bol_code, {}).get("id") or "",
                    hazmat_class_division=llm_line_item.get("hazmat_class_division", "") or "",
                    hazmat_number_type=llm_line_item.get("hazmat_number_type", "") or "",
                    hazmat_number=fi.get("UN"),
                    hazmat_packaging_group=llm_line_item.get("hazmat_packaging_group", "") or "",
                    nmfc_number=fi.get("nmfc"),
                    packaging_unit_count=llm_line_item.get("packaging_unit_count", 0) or 0,
                    stackable=llm_line_item.get("stackable", False) or False,
                )
            )

        # === accessorials ===
        accessorials: list[Accessorial] = [
            Accessorial(name=acc["name"], accessorial_id=acc["accesorial_id"])
            for acc in accessorials_results.get(bol_code, {}).get('accessorials',[])
        ]

        # === FinalLoad ===
        final_load = FinalLoad(
            bol_number=bol_code,
            load=load,
            stops=stops,
            line_items=line_items,
            accessorials=accessorials,
        )

        results.append(final_load)

    return results

build_hazmat_contact_obj(primus_result, account_id)

Construye un objeto Contact para emergencias de materiales peligrosos (Hazmat).

Extrae la información de contacto de emergencia desde un resultado de Primus y crea un objeto Contact específico para Hazmat.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
account_id str

El ID de la cuenta asociada a este contacto.

required

Returns:

Type Description
Optional[Contact]

Optional[Contact]: Un objeto Contact con la información de contacto de emergencia. Retorna None si no se encuentra el nombre o el teléfono de contacto de emergencia.

Source code in app/services/mapper/mapper.py
def build_hazmat_contact_obj(primus_result: dict, account_id: str) -> Optional[Contact]:
    """
    Construye un objeto Contact para emergencias de materiales peligrosos (Hazmat).

    Extrae la información de contacto de emergencia desde un resultado de Primus
    y crea un objeto `Contact` específico para Hazmat.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        account_id (str): El ID de la cuenta asociada a este contacto.

    Returns:
        Optional[Contact]: Un objeto `Contact` con la información de contacto de
            emergencia. Retorna `None` si no se encuentra el nombre o el teléfono
            de contacto de emergencia.
    """

    last_name = primus_result.get("emergencyContact", "")
    if not last_name:
        return None

    phone = primus_result.get("emergencyPhone", "")
    if not phone:
        return None

    return Contact(
        first_name="Hazmat",
        last_name=last_name,
        phone=phone,
        email="",
        account_id=account_id
    )

build_linear_feet_obj(primus_result, llm_results)

Construye un objeto LinearFeet a partir de los datos de Primus y del LLM.

Esta función extrae la información de los items de un booking de Primus y los resultados del LLM para determinar si la carga es apilable (stackable) y construir un objeto LinearFeet con los detalles de la carga.

Parameters:

Name Type Description Default
primus_result dict

Un diccionario que representa un único booking de Primus.

required
llm_results dict[str, dict]

Un diccionario con los resultados del procesamiento del LLM, usando el BOLNumber como clave.

required

Returns:

Type Description
Optional[LinearFeet]

Optional[LinearFeet]: Un objeto LinearFeet si se encuentra una coincidencia de BOL en los resultados del LLM, de lo contrario, retorna None.

Source code in app/services/mapper/mapper.py
def build_linear_feet_obj(
    primus_result: dict,
    llm_results: dict[str,dict]
) -> Optional[LinearFeet]:
    """
    Construye un objeto LinearFeet a partir de los datos de Primus y del LLM.

    Esta función extrae la información de los items de un booking de Primus y los
    resultados del LLM para determinar si la carga es apilable (stackable) y
    construir un objeto `LinearFeet` con los detalles de la carga.

    Args:
        primus_result (dict): Un diccionario que representa un único booking de Primus.
        llm_results (dict[str, dict]): Un diccionario con los resultados del
            procesamiento del LLM, usando el BOLNumber como clave.

    Returns:
        Optional[LinearFeet]: Un objeto `LinearFeet` si se encuentra una coincidencia
            de BOL en los resultados del LLM, de lo contrario, retorna `None`.
    """
    bol_number = str(primus_result["BOLNumber"])

    # Buscar el matching en LLM
    llm_entry = llm_results.get(bol_number,{})
    if not llm_entry:
        return None  # no hay matching LLM para este BOL

    # Determinar stackable: 
    # puede ser a nivel global (si todos los ítems son stackables)
    # Ejemplo: basta que al menos uno sea stackable para marcar True
    llm_line_items = llm_entry.get('parsed',{}).get("line_items",[])
    stackable = any(li.get("stackable") for li in llm_line_items)

    # Mapear freightInfo de Primus a LineItembyLinearFeet
    freight_info = primus_result.get("freightInfo", [])
    line_items = [
        LineItembyLinearFeet(
            lenght=item["length"],
            width=item["width"],
            height=item["height"],
            weight=item["weight"],
            hu_count=item["qty"]
        )
        for item in freight_info
    ]

    return LinearFeet(
        line_items=line_items,
        stackable=stackable
    )

get_llm_line_item(parsed_llm, idx)

Obtiene de forma segura un 'line item' de los resultados del LLM.

Intenta acceder a un 'line item' específico por su índice dentro de la lista de 'line_items' en los resultados procesados del LLM.

Parameters:

Name Type Description Default
parsed_llm dict

El diccionario de resultados procesados del LLM.

required
idx int

El índice del 'line item' a obtener.

required

Returns:

Name Type Description
dict

El 'line item' encontrado como un diccionario. Retorna un diccionario vacío si el índice está fuera de rango o si ocurre un error de tipo.

Source code in app/services/mapper/mapper.py
def get_llm_line_item(parsed_llm, idx):
    """
    Obtiene de forma segura un 'line item' de los resultados del LLM.

    Intenta acceder a un 'line item' específico por su índice dentro de la lista
    de 'line_items' en los resultados procesados del LLM.

    Args:
        parsed_llm (dict): El diccionario de resultados procesados del LLM.
        idx (int): El índice del 'line item' a obtener.

    Returns:
        dict: El 'line item' encontrado como un diccionario. Retorna un
            diccionario vacío si el índice está fuera de rango o si ocurre
            un error de tipo.
    """
    try:
        return parsed_llm.get("line_items", [])[idx] or {}
    except (IndexError, TypeError):
        return {}

load_mapper(jsonfilename)

Carga un archivo de configuración JSON desde el directorio actual.

Esta función construye la ruta a un archivo JSON basándose en el nombre de archivo proporcionado y lo carga en un diccionario de Python.

Parameters:

Name Type Description Default
jsonfilename str

El nombre del archivo JSON a cargar.

required

Returns:

Name Type Description
dict dict

Un diccionario con el contenido del archivo JSON.

Source code in app/services/mapper/mapper.py
def load_mapper(jsonfilename:str) -> dict:
    """
    Carga un archivo de configuración JSON desde el directorio actual.

    Esta función construye la ruta a un archivo JSON basándose en el nombre de archivo
    proporcionado y lo carga en un diccionario de Python.

    Args:
        jsonfilename (str): El nombre del archivo JSON a cargar.

    Returns:
        dict: Un diccionario con el contenido del archivo JSON.
    """
    base_dir = os.path.dirname(__file__)  # carpeta actual: app/services/mapper
    filepath = os.path.join(base_dir, jsonfilename, )
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)

map_accesorials_to_glt(all_primus_data, llm_results, mapper_data)

Mapea los 'accessorials' de Primus a los de GLT y los consolida.

Esta función procesa una lista de bookings de Primus, mapea sus 'accessorials' utilizando un diccionario de mapeo, agrega 'accessorials' adicionales basados en los resultados del LLM (ej. 'Stackable'), y elimina duplicados.

Parameters:

Name Type Description Default
all_primus_data list[dict]

Lista de bookings de Primus.

required
llm_results dict[str, dict]

Resultados del procesamiento del LLM, usando el BOLNumber como clave.

required
mapper_data dict

Diccionario para mapear los 'accessorials' de Primus a GLT.

required

Returns:

Type Description
dict[str, dict]

dict[str, dict]: Un diccionario donde cada clave es un BOLNumber y el valor es un diccionario que contiene una lista de 'accessorials' únicos para ese BOL.

Source code in app/services/mapper/mapper.py
def map_accesorials_to_glt(all_primus_data: list[dict], llm_results:dict[str,dict], mapper_data: dict) -> dict[str, dict]:
    """
    Mapea los 'accessorials' de Primus a los de GLT y los consolida.

    Esta función procesa una lista de bookings de Primus, mapea sus 'accessorials'
    utilizando un diccionario de mapeo, agrega 'accessorials' adicionales basados
    en los resultados del LLM (ej. 'Stackable'), y elimina duplicados.

    Args:
        all_primus_data (list[dict]): Lista de bookings de Primus.
        llm_results (dict[str, dict]): Resultados del procesamiento del LLM,
            usando el BOLNumber como clave.
        mapper_data (dict): Diccionario para mapear los 'accessorials' de Primus a GLT.

    Returns:
        dict[str, dict]: Un diccionario donde cada clave es un BOLNumber y el valor
            es un diccionario que contiene una lista de 'accessorials' únicos para ese BOL.
    """
    results = {}
    for booking in all_primus_data:
        bol_number = str(booking["BOLNumber"])
        parsed_llm = llm_results.get(bol_number,{}).get("parsed", {})

        accessorials_by_booking = booking["accessorials"]

        accessorials = []
        for accessorial in accessorials_by_booking: # list[str]
            accessorials.extend(mapper_data.get(accessorial,[])) # list[dict]

        for item in parsed_llm.get('line_items',[]):
            if item["stackable"]: 
                accessorials.append({
                    "name": "Stackable",
                    "accesorial_id": "a01Rc000002Fw8z"
                })
            else: 
                accessorials.append({
                    "name": "Non Stackable - Fragile",
                    "accesorial_id": "a013s00000PKXDc"
                })

        unique_accessorials = list({frozenset(d.items()): d for d in accessorials}.values())
        results[bol_number] = {'accessorials':unique_accessorials}

    return results

map_units(primus_data, llm_results)

Convierte las unidades de peso y dimensión de los datos de Primus.

Esta función itera sobre los datos de Primus y, basándose en la información extraída por el LLM, convierte el peso de KG a LBS y las dimensiones de CM a IN cuando es necesario.

Parameters:

Name Type Description Default
primus_data list[dict]

Una lista de diccionarios, donde cada diccionario representa un booking de Primus.

required
llm_results dict[str, dict]

Un diccionario con los resultados del procesamiento del LLM, usando el BOLNumber como clave.

required

Returns:

Type Description

list[dict]: La lista de bookings de Primus con las unidades de peso y dimensión convertidas.

Source code in app/services/mapper/mapper.py
def map_units(primus_data: list[dict], llm_results:dict[str,dict]):
    """
    Convierte las unidades de peso y dimensión de los datos de Primus.

    Esta función itera sobre los datos de Primus y, basándose en la información
    extraída por el LLM, convierte el peso de KG a LBS y las dimensiones de CM a IN
    cuando es necesario.

    Args:
        primus_data (list[dict]): Una lista de diccionarios, donde cada diccionario
            representa un booking de Primus.
        llm_results (dict[str, dict]): Un diccionario con los resultados del
            procesamiento del LLM, usando el BOLNumber como clave.

    Returns:
        list[dict]: La lista de bookings de Primus con las unidades de peso y
            dimensión convertidas.
    """
    for booking in primus_data:
        bol_number = str(booking["BOLNumber"])
        parsed_llm = llm_results.get(bol_number,{}).get("parsed", {})

        load_weight_units = parsed_llm.get("load", {}).get("weight_units", "LB")
        line_item_dimension_units = any(li.get("dimension_units") == "CM" for li in parsed_llm.get("line_items", []))

        if load_weight_units == "KG":
            for fi in booking.get("freightInfo", []):
                weight_in_lb = float(fi.get("weight", 0)) * 2.205 # Convertir KG a LBS
                fi["weight"] = math.ceil(weight_in_lb)
            total_weight_in_lb = float(booking.get("totalWeight", 0)) * 2.205 # Convertir KG a LBS
            booking["totalWeight"] = math.ceil(total_weight_in_lb)

        if line_item_dimension_units:
            for fi in booking.get("freightInfo", []):
                length_in = float(fi.get("length", 0)) * 0.393701 # Convertir CM a IN
                width_in = float(fi.get("width", 0)) * 0.393701
                height_in = float(fi.get("height", 0)) * 0.393701
                fi["length"] = math.ceil(length_in)
                fi["width"] = math.ceil(width_in) 
                fi["height"] = math.ceil(height_in)   

    return primus_data

CRM Client

Cliente para interactuar con la API del CRM (Salesforce). Se encarga de crear y actualizar cuentas, contactos, cargas (loads) y cotizaciones.

app.services.crm_client

app.services.crm_client

Classes

CRMClient

Cliente para interactuar con la API de CRM.

Esta clase maneja la autenticación y las solicitudes a la API de CRM para crear y actualizar cuentas, contactos, cargas y cotizaciones.

Source code in app/services/crm_client.py
class CRMClient:
    """
    Cliente para interactuar con la API de CRM.

    Esta clase maneja la autenticación y las solicitudes a la API de CRM
    para crear y actualizar cuentas, contactos, cargas y cotizaciones.
    """
    def __init__(self):
        """
        Inicializa el cliente de CRM.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        self.base_url = settings.crm_base_url
        self.user = settings.crm_user
        self.password = settings.crm_password
        self.timeout = settings.http_client_timeout_seconds
        self.semaphore = asyncio.Semaphore(settings.concurrency)
        self.token = None


    async def login(self):
        """
        Autentica con el CRM y guarda el token de acceso.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor, pero almacena el token de acceso en self.token.
            Levanta una excepción si la autenticación falla.
        """
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resp = await client.post(
                f"{self.base_url}/auth/access-token-v2",
                data={
                    "grant_type": "password",
                    "username": self.user,
                    "password": self.password,
                    "scope": "",
                    "client_id": "",
                    "client_secret": ""
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            resp.raise_for_status()
            self.token = resp.json().get("access_token")
            logger.info("CRM login OK, nuevo token obtenido")

    def _headers(self) -> dict:
        """
        Genera los encabezados de autorización para las solicitudes al CRM.

        Args:
            No recibe argumentos.

        Returns:
            dict: Un diccionario con los encabezados de autorización y tipo de contenido.
        """
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"}


    async def _request_with_reauth(self, method: str, path: str, **kwargs) -> httpx.Response:
        """
        Envía una solicitud HTTP al CRM, con reintento automático si el token está expirado.

        Args:
            method (str): El método HTTP a utilizar (GET, POST, PATCH, etc.).
            path (str): La ruta del endpoint del CRM.
            **kwargs: Argumentos adicionales para la solicitud httpx.request.

        Returns:
            httpx.Response: La respuesta de la solicitud HTTP.
            Levanta una excepción si la solicitud falla después de los reintentos.
        """
        if not self.token:
            await self.login()

        url = f"{self.base_url}/{path.lstrip('/')}"
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            resp = await client.request(method, url, headers=self._headers(), **kwargs)

            if resp.status_code in (401, 403):
                logger.warning("Token expirado o inválido, reintentando login...")
                await self.login()
                resp = await client.request(method, url, headers=self._headers(), **kwargs)

            resp.raise_for_status()
            return resp

    # -------------------------------
    # Métodos usando _request_with_reauth
    # -------------------------------


    async def _post_resource(self, bol_number: str, role: str, resource: Accounts | Contact, url_path: str):
        """
        Envía una solicitud POST para crear un recurso (Cuenta o Contacto) en el CRM.
        Incluye lógica de reintento para contactos con un número de teléfono predeterminado en caso de error.

        Args:
            bol_number (str): El número de BOL asociado al recurso.
            role (str): El rol del recurso (e.g., "shipper", "consignee", "hazmat").
            resource (Accounts | Contact): El objeto Pydantic del recurso a crear.
            url_path (str): La ruta del endpoint para crear el recurso (e.g., "account", "contact").

        Returns:
            tuple: Una tupla que contiene el número de BOL, el rol y un diccionario con el ID del recurso creado.
                   Si falla, retorna un diccionario con un ID vacío.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth(
                    "POST",
                    url_path,
                    json=resource.model_dump(),
                )
                return bol_number, role, {"id": resp.json().get("id")}
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error creando ({role}) para BOL {bol_number}: {e} {error_detail}")

                if isinstance(resource, Contact):
                    tp = "Contact"
                else:
                    tp = "Account"

                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error creando ({tp}-{role}) para BOL {bol_number}: {e} {error_detail}"
                    }
                )

                if isinstance(resource, Contact):
                    try:
                        logger.warning(f"Reintentando creación de Contact para BOL {bol_number} con phone 3057260275")
                        new_resource = resource.model_copy(update={"phone": "3057260275"})
                        resp = await self._request_with_reauth(
                            "POST",
                            url_path,
                            json=new_resource.model_dump(),
                        )
                        return bol_number, role, {"id": resp.json().get("id")}
                    except Exception as e2:
                        error_detail2 = ""
                        if hasattr(e2, "response") and e2.response is not None:
                            try:
                                error_json = e2.response.json()
                                error_detail2 = error_json.get("detail", str(error_json))
                            except Exception:
                                error_detail2 = e2.response.text
                        else:
                            error_detail2 = str(e2)

                        logger.exception(f"Segundo intento fallido creando Contact para BOL {bol_number}: {e2} {error_detail2}")
                        email_service.send_email(
                            dynamic_data={
                                "bol_number": bol_number,
                                "error_message": f"Error creando (Contact) para BOL {bol_number}: {e2} {error_detail2}"
                            }
                        )

                return bol_number, role, {"id": ""}


    async def create_accounts_id(self, primus_results: list):
        """
        Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.

        Args:
            primus_results (list): Una lista de diccionarios con los resultados de Primus.

        Returns:
            dict: Un diccionario que contiene los IDs de las cuentas creadas,
                  separados por "shipper" y "consignee", indexados por número de BOL.
        """
        tasks = []
        for primus_result in primus_results:
            bol_number = str(primus_result["BOLNumber"])
            for role in ("shipper", "consignee"):
                account = build_account_obj(primus_result, role)
                if account:
                    tasks.append(self._post_resource(bol_number, role, account, "account"))
                else:
                    logger.warning(f"No se pudo construir Account {role} para BOL {bol_number}, se omite")
        results = await asyncio.gather(*tasks)

        return {"shipper": {bol: acc for bol, role, acc in results if role == "shipper"},
                "consignee": {bol: acc for bol, role, acc in results if role == "consignee"}}


    async def create_contacts_id(self, primus_results: list, accounts_mapping: dict):
        """
        Crea contactos en el CRM para los shippers, consignees y contactos hazmat.

        Args:
            primus_results (list): Una lista de diccionarios con los resultados de Primus.
            accounts_mapping (dict): Un diccionario con los IDs de las cuentas ya creadas.

        Returns:
            dict: Un diccionario que contiene los IDs de los contactos creados,
                  separados por "shipper", "consignee" y "hazmat", indexados por número de BOL.
        """
        tasks = []
        for primus_result in primus_results:
            bol_number = str(primus_result["BOLNumber"])
            for role in ("shipper", "consignee"):
                account_id = accounts_mapping[role].get(bol_number, {}).get("id")
                if account_id:
                    contact = build_contact_obj(primus_result, role, account_id)
                    if contact:
                        tasks.append(self._post_resource(bol_number, role, contact, "contact"))

            # hazmat
            if evaluate_if_need_hazmat_contact(primus_result):
                consignee_id = accounts_mapping["consignee"].get(bol_number, {}).get("id")
                if consignee_id:
                    hazmat_contact = build_hazmat_contact_obj(primus_result, consignee_id)
                    if hazmat_contact:
                        tasks.append(self._post_resource(bol_number, "hazmat", hazmat_contact, "contact"))

        results = await asyncio.gather(*tasks)
        out = {"shipper": {}, "consignee": {}, "hazmat": {}}
        for bol, role, cont in results:
            out[role][bol] = cont
        return out


    async def _get_customer_id(self, bol_number: str, third_party: str):
        """
        Obtiene el ID de un cliente (customer) del CRM basado en el third_party.

        Args:
            bol_number (str): El número de BOL asociado.
            third_party (str): El identificador del tercer partido.

        Returns:
            tuple: Una tupla que contiene el número de BOL y un diccionario con el ID del cliente.
                   Si falla, retorna un diccionario con un ID vacío.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("GET", f"account/tai/{third_party}")
                response = resp.json()
                return bol_number, {"customer_id": response[0].get("id")}
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error obteniendo customer_id para BOL {bol_number}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error obteniendo customer_id para BOL {bol_number}: {e} - {error_detail}"
                    }
                )
                return bol_number, {"customer_id": ""}


    async def _get_mode_id(self, bol_number: str, mode: str):
        """
        Obtiene el ID de un modo de transporte (mode) del CRM.

        Args:
            bol_number (str): El número de BOL asociado.
            mode (str): El nombre del modo de transporte.

        Returns:
            tuple: Una tupla que contiene el número de BOL y un diccionario con el ID del modo.
                   Si falla, retorna un diccionario con un ID vacío.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("GET", "mode", params={"department": mode})
                response = resp.json()
                return bol_number, {"mode_id": next((item["id"] for item in response if item["name"] == mode), "")}
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error obteniendo mode_id para BOL {bol_number}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error obteniendo mode_id para BOL {bol_number}: {e} - {error_detail}"
                    }
                )
                return bol_number, {"mode_id": ""}


    async def get_customer_and_mode_id(self, primus_results: list):
        """
        Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.

        Args:
            primus_results (list): Una lista de diccionarios con los resultados de Primus.

        Returns:
            dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
                  que contiene "customer_id" y/o "mode_id".
        """
        tasks = []
        for primus_result in primus_results:
            bol_number = str(primus_result["BOLNumber"])
            third_party = primus_result.get("thirdParty", {}).get("id")
            mode = primus_result.get("vendor", {}).get("mode")

            if not third_party and not mode:
                logger.warning(f"No se encontró thirdParty ni mode en BOL {bol_number}, se omite")
                continue

            if third_party:
                tasks.append(self._get_customer_id(bol_number, third_party))
            if mode:
                tasks.append(self._get_mode_id(bol_number, mode))
        results = await asyncio.gather(*tasks)

        output: dict[str, dict[str, str | None]] = {}
        for bol_number, result in results:
            if bol_number not in output:
                output[bol_number] = {}
            output[bol_number].update(result)
        return output


    async def _post_load(self, bol_number: str, final_load: FinalLoad):
        """
        Crea una carga (load) en el CRM.

        Args:
            bol_number (str): El número de BOL asociado a la carga.
            final_load (FinalLoad): El objeto Pydantic con los datos finales de la carga.

        Returns:
            tuple: Una tupla que contiene el número de BOL y un diccionario con el ID de la carga creada.
                   Si falla, retorna un diccionario con un ID None.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth(
                    "POST",
                    "load/v2",
                    json=final_load.model_dump()
                )
                return bol_number, {"id": resp.json().get("id")}
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error creando load para BOL {bol_number}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error creando load para BOL {bol_number}: {e} - {error_detail}"
                    }
                )
                return bol_number, {"id": None}


    async def create_loads(self, final_loads: list[FinalLoad]):
        """
        Crea múltiples cargas en el CRM.

        Args:
            final_loads (list[FinalLoad]): Una lista de objetos Pydantic con los datos finales de las cargas.

        Returns:
            dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
                  con el ID de la carga creada.
        """
        tasks = [self._post_load(fl.bol_number, fl) for fl in final_loads]
        results = await asyncio.gather(*tasks)
        return {bol: load_out for bol, load_out in results}


    async def _post_carrier_quote(self, bol_number: str, carrier_quote: CarrierQuote):
        """
        Crea una cotización de transportista (carrier quote) en el CRM.

        Args:
            bol_number (str): El número de BOL asociado a la cotización.
            carrier_quote (CarrierQuote): El objeto Pydantic con los datos de la cotización del transportista.

        Returns:
            No retorna ningún valor. Envía un correo electrónico si falla la creación.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("POST", "carrier-quote", json=carrier_quote.model_dump())
                logger.info(f"Carrier quote creado para BOL {bol_number}: {resp.json()}")
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error creando Carrier quote para BOL {bol_number}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error creando Carrier quote para BOL {bol_number}: {e} - {error_detail}"
                    }
                )


    async def _post_customer_quote(self, bol_number: str, customer_quote: CustomerQuote):
        """
        Crea una cotización de cliente (customer quote) en el CRM.

        Args:
            bol_number (str): El número de BOL asociado a la cotización.
            customer_quote (CustomerQuote): El objeto Pydantic con los datos de la cotización del cliente.

        Returns:
            No retorna ningún valor. Envía un correo electrónico si falla la creación.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("POST", "customer-quote", json=customer_quote.model_dump())
                logger.info(f"Customer quote creado para BOL {bol_number}: {resp.json()}")
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error creando Customer quote para BOL {bol_number}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error creando Customer quote para BOL {bol_number}: {e} - {error_detail}"
                    }
                )


    async def create_quotes(self, primus_results, llm_results, loads_mapping, carrier_services):
        """
        Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.

        Args:
            primus_results (list): Una lista de diccionarios con los resultados de Primus.
            llm_results (dict): Un diccionario con los resultados del LLM, indexados por número de BOL.
            loads_mapping (dict): Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL.
            carrier_services (dict): Un diccionario con los servicios de transportista, indexados por número de BOL.

        Returns:
            No retorna ningún valor.
        """
        tasks = []
        for primus_result in primus_results:
            bol_number = str(primus_result["BOLNumber"])
            load_id = loads_mapping.get(bol_number, {}).get("id")
            if not load_id:
                continue
            llm_entry = llm_results.get(bol_number, {})
            carrier_service = carrier_services.get(bol_number, {})
            carrier_quote = build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)
            customer_quote = build_customer_quote_object(primus_result, load_id)
            tasks.append(self._post_carrier_quote(bol_number, carrier_quote))
            tasks.append(self._post_customer_quote(bol_number, customer_quote))
        await asyncio.gather(*tasks)
        logger.info("Todos los CarrierQuotes y CustomerQuotes fueron procesados.")


    async def _get_carrier_service_by_scacs(self, bol_number: str, scac: str):
        """
        Obtiene el servicio de transportista (carrier service) del CRM por su código SCAC.

        Args:
            bol_number (str): El número de BOL asociado.
            scac (str): El código SCAC del transportista.

        Returns:
            tuple: Una tupla que contiene el número de BOL y un diccionario con la información del servicio de transportista.
                   Si falla, retorna un diccionario vacío.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("GET", "carrier-service/primus/scacs", params={"scacs": [scac]})
                carriers = resp.json()
                if scac == "FDEG":
                    carrier_service = next((item for item in carriers if item["name"] == "FEDEX PACKAGING"), {})
                else:
                    carrier_service = next((item for item in carriers if item["name"].endswith("(LTL)") ), None)
                    if not carrier_service:
                        project44_carriers = [item for item in carriers if item.get("api_provider") == "Project44"]
                        if len(project44_carriers) == 1:
                            carrier_service = project44_carriers[0]
                        elif len(project44_carriers) > 1:
                            carrier_service = next(
                                (item for item in project44_carriers if "LTL" in item["name"]),
                                project44_carriers[0]
                            )
                        else:
                            carrier_service = {}
                return bol_number, carrier_service
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error consultando carrier SCAC={scac}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error consultando carrier SCAC={scac}: {e} - {error_detail}"
                    }
                )
                return bol_number, {}


    async def carrier_service(self, primus_results: list[dict]):
        """
        Obtiene los servicios de transportista para múltiples resultados de Primus.

        Args:
            primus_results (list[dict]): Una lista de diccionarios con los resultados de Primus.

        Returns:
            dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
                  con la información del servicio de transportista.
        """
        results = await asyncio.gather(
            *[self._get_carrier_service_by_scacs(str(p["BOLNumber"]),
                SCACS_MAPPING.get(p.get("vendor",{}).get("SCAC"),
                p.get("vendor",{}).get("SCAC")))
                for p in primus_results]
        )
        return {bol: cs for bol, cs in results}


    async def _get_account_by_name(self, bol_number: str, account_name: str):
        """
        Obtiene el ID de una cuenta del CRM por su nombre.

        Args:
            bol_number (str): El número de BOL asociado.
            account_name (str): El nombre de la cuenta a buscar.

        Returns:
            tuple: Una tupla que contiene el número de BOL y un diccionario con el ID de la cuenta.
                   Si falla, retorna un diccionario con un ID None.
        """
        async with self.semaphore:
            try:
                resp = await self._request_with_reauth("GET", "account/name",
                params={"account_name": account_name, "billing_account": True})
                id = next((item["id"] for item in resp.json() if item["name"].lower() == account_name.lower()), None)
                return bol_number, {"id": id}
            except Exception as e:
                error_detail = ""
                if hasattr(e, "response") and e.response is not None:
                    try:
                        error_json = e.response.json()
                        error_detail = error_json.get("detail", str(error_json))
                    except Exception:
                        error_detail = e.response.text
                else:
                    error_detail = str(e)

                logger.exception(f"Error consultando account={account_name}: {e} - {error_detail}")
                email_service.send_email(
                    dynamic_data={
                        "bol_number": bol_number,
                        "error_message": f"Error consultando account={account_name}: {e} - {error_detail}"
                    }
                )
                return bol_number, {"id": None}


    async def create_carrier_bill_id(self, llm_results: dict[str,dict]):
        """
        Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.

        Args:
            llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.

        Returns:
            dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
                  con el ID de la cuenta de facturación del transportista.
        """
        results = []
        for bol_number, bol_data in llm_results.items():
            bill_to_third_party = bol_data.get("parsed",{}).get("load",{}).get("bill_to_third_party")
            logger.info(f"BOL {bol_number}, bill_to_third_party: {bill_to_third_party}")
            if bill_to_third_party:
                results.append(await self._get_account_by_name(bol_number, bill_to_third_party))
        return {bol: out for bol, out in results}


    async def _patch_load(self, bol_number: str, load_id: str, updates_data: dict):
        """
        Actualiza una carga (load) existente en el CRM con los datos proporcionados.

        Args:
            bol_number (str): El número de BOL asociado a la carga.
            load_id (str): El ID de la carga a actualizar.
            updates_data (dict): Un diccionario con los datos a actualizar en la carga.

        Returns:
            No retorna ningún valor. Envía un correo electrónico si falla la actualización.
        """
        payload = {
            "billing_reference": bol_number,
            "po_number": updates_data.get("po_number"),
            "pro_number": updates_data.get("pro_number"),
            "carrier_bill_id": updates_data.get("carrier_bill_id"),
            "carrier_id": updates_data.get("carrier_id"),
            "document_options_id": updates_data.get("document_options_id"),
            "external_tms_integration": updates_data.get("external_tms_integration")
        }
        if "mode_id" in updates_data:
            payload["mode_id"] = updates_data["mode_id"]
        try:
            await self._request_with_reauth("PATCH", f"load/{load_id}", json=payload)
            logger.info(f"[OK] actualización exitosa para BOL {bol_number} (load_id={load_id})")
        except Exception as e:
            error_detail = ""
            if hasattr(e, "response") and e.response is not None:
                try:
                    error_json = e.response.json()
                    error_detail = error_json.get("detail", str(error_json))
                except Exception:
                    error_detail = e.response.text
            else:
                error_detail = str(e)

            logger.exception(f"[ERROR] Falló actualización de datos para BOL {bol_number} (load_id={load_id}): {e} - {error_detail}")
            email_service.send_email(
                dynamic_data={
                    "bol_number": bol_number,
                    "error_message": f"[ERROR] Falló actualización de datos para BOL {bol_number} (load_id={load_id}): {e} - {error_detail}"
                }
            )


    async def update_loads(self, loads_id: dict, updates_data: dict):
        """
        Actualiza múltiples cargas en el CRM.

        Args:
            loads_id (dict): Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL.
            updates_data (dict): Un diccionario con los datos de actualización para cada carga, indexados por número de BOL.

        Returns:
            No retorna ningún valor.
        """
        tasks = []
        for bol_number, load_data in loads_id.items():
            load_id = load_data.get("id")
            data = updates_data.get(bol_number, {})
            if load_id and data:
                tasks.append(self._patch_load(bol_number, load_id, data))

        await asyncio.gather(*tasks)
        logger.info("Actualización de loads finalizada.")
Functions
__init__()

Inicializa el cliente de CRM.

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/crm_client.py
def __init__(self):
    """
    Inicializa el cliente de CRM.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor.
    """
    self.base_url = settings.crm_base_url
    self.user = settings.crm_user
    self.password = settings.crm_password
    self.timeout = settings.http_client_timeout_seconds
    self.semaphore = asyncio.Semaphore(settings.concurrency)
    self.token = None
carrier_service(primus_results) async

Obtiene los servicios de transportista para múltiples resultados de Primus.

Parameters:

Name Type Description Default
primus_results list[dict]

Una lista de diccionarios con los resultados de Primus.

required

Returns:

Name Type Description
dict

Un diccionario donde la clave es el número de BOL y el valor es un diccionario con la información del servicio de transportista.

Source code in app/services/crm_client.py
async def carrier_service(self, primus_results: list[dict]):
    """
    Obtiene los servicios de transportista para múltiples resultados de Primus.

    Args:
        primus_results (list[dict]): Una lista de diccionarios con los resultados de Primus.

    Returns:
        dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
              con la información del servicio de transportista.
    """
    results = await asyncio.gather(
        *[self._get_carrier_service_by_scacs(str(p["BOLNumber"]),
            SCACS_MAPPING.get(p.get("vendor",{}).get("SCAC"),
            p.get("vendor",{}).get("SCAC")))
            for p in primus_results]
    )
    return {bol: cs for bol, cs in results}
create_accounts_id(primus_results) async

Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.

Parameters:

Name Type Description Default
primus_results list

Una lista de diccionarios con los resultados de Primus.

required

Returns:

Name Type Description
dict

Un diccionario que contiene los IDs de las cuentas creadas, separados por "shipper" y "consignee", indexados por número de BOL.

Source code in app/services/crm_client.py
async def create_accounts_id(self, primus_results: list):
    """
    Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.

    Args:
        primus_results (list): Una lista de diccionarios con los resultados de Primus.

    Returns:
        dict: Un diccionario que contiene los IDs de las cuentas creadas,
              separados por "shipper" y "consignee", indexados por número de BOL.
    """
    tasks = []
    for primus_result in primus_results:
        bol_number = str(primus_result["BOLNumber"])
        for role in ("shipper", "consignee"):
            account = build_account_obj(primus_result, role)
            if account:
                tasks.append(self._post_resource(bol_number, role, account, "account"))
            else:
                logger.warning(f"No se pudo construir Account {role} para BOL {bol_number}, se omite")
    results = await asyncio.gather(*tasks)

    return {"shipper": {bol: acc for bol, role, acc in results if role == "shipper"},
            "consignee": {bol: acc for bol, role, acc in results if role == "consignee"}}
create_carrier_bill_id(llm_results) async

Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.

Parameters:

Name Type Description Default
llm_results dict[str, dict]

Un diccionario con los resultados del LLM, indexados por número de BOL.

required

Returns:

Name Type Description
dict

Un diccionario donde la clave es el número de BOL y el valor es un diccionario con el ID de la cuenta de facturación del transportista.

Source code in app/services/crm_client.py
async def create_carrier_bill_id(self, llm_results: dict[str,dict]):
    """
    Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.

    Args:
        llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.

    Returns:
        dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
              con el ID de la cuenta de facturación del transportista.
    """
    results = []
    for bol_number, bol_data in llm_results.items():
        bill_to_third_party = bol_data.get("parsed",{}).get("load",{}).get("bill_to_third_party")
        logger.info(f"BOL {bol_number}, bill_to_third_party: {bill_to_third_party}")
        if bill_to_third_party:
            results.append(await self._get_account_by_name(bol_number, bill_to_third_party))
    return {bol: out for bol, out in results}
create_contacts_id(primus_results, accounts_mapping) async

Crea contactos en el CRM para los shippers, consignees y contactos hazmat.

Parameters:

Name Type Description Default
primus_results list

Una lista de diccionarios con los resultados de Primus.

required
accounts_mapping dict

Un diccionario con los IDs de las cuentas ya creadas.

required

Returns:

Name Type Description
dict

Un diccionario que contiene los IDs de los contactos creados, separados por "shipper", "consignee" y "hazmat", indexados por número de BOL.

Source code in app/services/crm_client.py
async def create_contacts_id(self, primus_results: list, accounts_mapping: dict):
    """
    Crea contactos en el CRM para los shippers, consignees y contactos hazmat.

    Args:
        primus_results (list): Una lista de diccionarios con los resultados de Primus.
        accounts_mapping (dict): Un diccionario con los IDs de las cuentas ya creadas.

    Returns:
        dict: Un diccionario que contiene los IDs de los contactos creados,
              separados por "shipper", "consignee" y "hazmat", indexados por número de BOL.
    """
    tasks = []
    for primus_result in primus_results:
        bol_number = str(primus_result["BOLNumber"])
        for role in ("shipper", "consignee"):
            account_id = accounts_mapping[role].get(bol_number, {}).get("id")
            if account_id:
                contact = build_contact_obj(primus_result, role, account_id)
                if contact:
                    tasks.append(self._post_resource(bol_number, role, contact, "contact"))

        # hazmat
        if evaluate_if_need_hazmat_contact(primus_result):
            consignee_id = accounts_mapping["consignee"].get(bol_number, {}).get("id")
            if consignee_id:
                hazmat_contact = build_hazmat_contact_obj(primus_result, consignee_id)
                if hazmat_contact:
                    tasks.append(self._post_resource(bol_number, "hazmat", hazmat_contact, "contact"))

    results = await asyncio.gather(*tasks)
    out = {"shipper": {}, "consignee": {}, "hazmat": {}}
    for bol, role, cont in results:
        out[role][bol] = cont
    return out
create_loads(final_loads) async

Crea múltiples cargas en el CRM.

Parameters:

Name Type Description Default
final_loads list[FinalLoad]

Una lista de objetos Pydantic con los datos finales de las cargas.

required

Returns:

Name Type Description
dict

Un diccionario donde la clave es el número de BOL y el valor es un diccionario con el ID de la carga creada.

Source code in app/services/crm_client.py
async def create_loads(self, final_loads: list[FinalLoad]):
    """
    Crea múltiples cargas en el CRM.

    Args:
        final_loads (list[FinalLoad]): Una lista de objetos Pydantic con los datos finales de las cargas.

    Returns:
        dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
              con el ID de la carga creada.
    """
    tasks = [self._post_load(fl.bol_number, fl) for fl in final_loads]
    results = await asyncio.gather(*tasks)
    return {bol: load_out for bol, load_out in results}
create_quotes(primus_results, llm_results, loads_mapping, carrier_services) async

Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.

Parameters:

Name Type Description Default
primus_results list

Una lista de diccionarios con los resultados de Primus.

required
llm_results dict

Un diccionario con los resultados del LLM, indexados por número de BOL.

required
loads_mapping dict

Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL.

required
carrier_services dict

Un diccionario con los servicios de transportista, indexados por número de BOL.

required

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/crm_client.py
async def create_quotes(self, primus_results, llm_results, loads_mapping, carrier_services):
    """
    Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.

    Args:
        primus_results (list): Una lista de diccionarios con los resultados de Primus.
        llm_results (dict): Un diccionario con los resultados del LLM, indexados por número de BOL.
        loads_mapping (dict): Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL.
        carrier_services (dict): Un diccionario con los servicios de transportista, indexados por número de BOL.

    Returns:
        No retorna ningún valor.
    """
    tasks = []
    for primus_result in primus_results:
        bol_number = str(primus_result["BOLNumber"])
        load_id = loads_mapping.get(bol_number, {}).get("id")
        if not load_id:
            continue
        llm_entry = llm_results.get(bol_number, {})
        carrier_service = carrier_services.get(bol_number, {})
        carrier_quote = build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)
        customer_quote = build_customer_quote_object(primus_result, load_id)
        tasks.append(self._post_carrier_quote(bol_number, carrier_quote))
        tasks.append(self._post_customer_quote(bol_number, customer_quote))
    await asyncio.gather(*tasks)
    logger.info("Todos los CarrierQuotes y CustomerQuotes fueron procesados.")
get_customer_and_mode_id(primus_results) async

Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.

Parameters:

Name Type Description Default
primus_results list

Una lista de diccionarios con los resultados de Primus.

required

Returns:

Name Type Description
dict

Un diccionario donde la clave es el número de BOL y el valor es un diccionario que contiene "customer_id" y/o "mode_id".

Source code in app/services/crm_client.py
async def get_customer_and_mode_id(self, primus_results: list):
    """
    Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.

    Args:
        primus_results (list): Una lista de diccionarios con los resultados de Primus.

    Returns:
        dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
              que contiene "customer_id" y/o "mode_id".
    """
    tasks = []
    for primus_result in primus_results:
        bol_number = str(primus_result["BOLNumber"])
        third_party = primus_result.get("thirdParty", {}).get("id")
        mode = primus_result.get("vendor", {}).get("mode")

        if not third_party and not mode:
            logger.warning(f"No se encontró thirdParty ni mode en BOL {bol_number}, se omite")
            continue

        if third_party:
            tasks.append(self._get_customer_id(bol_number, third_party))
        if mode:
            tasks.append(self._get_mode_id(bol_number, mode))
    results = await asyncio.gather(*tasks)

    output: dict[str, dict[str, str | None]] = {}
    for bol_number, result in results:
        if bol_number not in output:
            output[bol_number] = {}
        output[bol_number].update(result)
    return output
login() async

Autentica con el CRM y guarda el token de acceso.

Returns:

Type Description

No retorna ningún valor, pero almacena el token de acceso en self.token.

Levanta una excepción si la autenticación falla.

Source code in app/services/crm_client.py
async def login(self):
    """
    Autentica con el CRM y guarda el token de acceso.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor, pero almacena el token de acceso en self.token.
        Levanta una excepción si la autenticación falla.
    """
    async with httpx.AsyncClient(timeout=self.timeout) as client:
        resp = await client.post(
            f"{self.base_url}/auth/access-token-v2",
            data={
                "grant_type": "password",
                "username": self.user,
                "password": self.password,
                "scope": "",
                "client_id": "",
                "client_secret": ""
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"}
        )
        resp.raise_for_status()
        self.token = resp.json().get("access_token")
        logger.info("CRM login OK, nuevo token obtenido")
update_loads(loads_id, updates_data) async

Actualiza múltiples cargas en el CRM.

Parameters:

Name Type Description Default
loads_id dict

Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL.

required
updates_data dict

Un diccionario con los datos de actualización para cada carga, indexados por número de BOL.

required

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/crm_client.py
async def update_loads(self, loads_id: dict, updates_data: dict):
    """
    Actualiza múltiples cargas en el CRM.

    Args:
        loads_id (dict): Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL.
        updates_data (dict): Un diccionario con los datos de actualización para cada carga, indexados por número de BOL.

    Returns:
        No retorna ningún valor.
    """
    tasks = []
    for bol_number, load_data in loads_id.items():
        load_id = load_data.get("id")
        data = updates_data.get(bol_number, {})
        if load_id and data:
            tasks.append(self._patch_load(bol_number, load_id, data))

    await asyncio.gather(*tasks)
    logger.info("Actualización de loads finalizada.")

Functions

evaluate_if_need_hazmat_contact(primus_result)

Evalúa si se necesita un contacto Hazmat basado en el contenido del primus_result.

Parameters:

Name Type Description Default
primus_result dict

El resultado del procesamiento de Primus que contiene la información de la carga.

required

Returns:

Name Type Description
bool bool

True si se encuentra algún artículo peligroso (hazmat), False en caso contrario.

Source code in app/services/crm_client.py
def evaluate_if_need_hazmat_contact(primus_result: dict) -> bool:
    """
    Evalúa si se necesita un contacto Hazmat basado en el contenido del primus_result.

    Args:
        primus_result (dict): El resultado del procesamiento de Primus que contiene la información de la carga.

    Returns:
        bool: True si se encuentra algún artículo peligroso (hazmat), False en caso contrario.
    """
    lineItems = primus_result.get("freightInfo", [])
    for lineItem in lineItems:
        hazmat = lineItem.get("hazmat", False)
        if hazmat:
            return True
    return False

Megatron Client

Cliente que se conecta a la API de Megatron, utilizada para realizar cálculos específicos como los pies lineales (linear feet).

app.services.megatron_client

app.services.megatron_client

Classes

MegatronClient

Cliente para interactuar con la API de Megatron.

Esta clase se encarga de calcular los pies lineales (linear feet) de los Bills of Lading (BOLs) utilizando la API de Megatron.

Source code in app/services/megatron_client.py
class MegatronClient:
    """
    Cliente para interactuar con la API de Megatron.

    Esta clase se encarga de calcular los pies lineales (linear feet) 
    de los Bills of Lading (BOLs) utilizando la API de Megatron.
    """
    def __init__(self):
        """
        Inicializa el cliente de Megatron.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        self.base_url = settings.megatron_base_url
        self.token = settings.megatron_token

    async def _post_linear_feet(
        self, client: httpx.AsyncClient, bol_number: str, lf_in: LinearFeet, headers: dict
    ) -> tuple[str, int | None]:
        """
        Envía una solicitud POST a la API de Megatron para calcular los pies lineales de un solo BOL.

        Args:
            client (httpx.AsyncClient): Cliente HTTP asíncrono para realizar la solicitud.
            bol_number (str): El número de BOL para el cual se calcularán los pies lineales.
            lf_in (LinearFeet): Objeto Pydantic con los datos de entrada para el cálculo de pies lineales.
            headers (dict): Diccionario de encabezados HTTP para la solicitud.

        Returns:
            tuple[str, dict]: Una tupla que contiene el número de BOL y un diccionario con el total de pies lineales calculados.
                              Si ocurre un error, retorna el número de BOL y un diccionario con 'total_linear_feet' en 0.0.
        """
        try:
            resp = await client.post(
                f"{self.base_url}/linear-feet/calculate",
                headers=headers,
                json=lf_in.model_dump()
            )
            resp.raise_for_status()

            return bol_number, {"total_linear_feet":resp.json().get("total_linear_feet", 0.0)}

        except Exception as e:
            error_detail = ""
            if hasattr(e, "response") and e.response is not None:
                try:
                    error_json = e.response.json()
                    error_detail = error_json.get("detail", str(error_json))
                except Exception:
                    error_detail = e.response.text
            else:
                error_detail = str(e)

            logger.exception(f"Error calculating linear feet for BOL {bol_number}: {e} - {error_detail}")
            email_service.send_email(
                dynamic_data={
                    "bol_number": bol_number,
                    "error_message": f"Error calculating linear feet for BOL {bol_number}: {e} - {error_detail}"
                }
            )
            return bol_number, {"total_linear_feet": 0.0}

    async def calculate_linear_feet_per_bol(self, primus_results: list, llm_results: dict[str,dict]):
        """
        Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.

        Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron
        y devuelve los resultados.

        Args:
            primus_results (list): Una lista de diccionarios con los resultados de Primus.
            llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.

        Returns:
            dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
                  que contiene el total de pies lineales calculados.
        """
        headers = {
            "access-token": self.token,
            "Content-Type": "application/json"
        }
        tasks = []

        async with httpx.AsyncClient() as client:
            for primus_result in primus_results:
                bol_number = str(primus_result["BOLNumber"])
                lf_in = build_linear_feet_obj(primus_result, llm_results)

                if not lf_in:
                    logger.warning(f"No hay data LLM para BOL {bol_number}, se omite")
                    continue

                tasks.append(
                    self._post_linear_feet(client, bol_number, lf_in, headers)
                )

            results = await asyncio.gather(*tasks, return_exceptions=False)

        return {bol: lf_out for bol, lf_out in results}
Functions
__init__()

Inicializa el cliente de Megatron.

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/megatron_client.py
def __init__(self):
    """
    Inicializa el cliente de Megatron.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor.
    """
    self.base_url = settings.megatron_base_url
    self.token = settings.megatron_token
calculate_linear_feet_per_bol(primus_results, llm_results) async

Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.

Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron y devuelve los resultados.

Parameters:

Name Type Description Default
primus_results list

Una lista de diccionarios con los resultados de Primus.

required
llm_results dict[str, dict]

Un diccionario con los resultados del LLM, indexados por número de BOL.

required

Returns:

Name Type Description
dict

Un diccionario donde la clave es el número de BOL y el valor es un diccionario que contiene el total de pies lineales calculados.

Source code in app/services/megatron_client.py
async def calculate_linear_feet_per_bol(self, primus_results: list, llm_results: dict[str,dict]):
    """
    Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.

    Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron
    y devuelve los resultados.

    Args:
        primus_results (list): Una lista de diccionarios con los resultados de Primus.
        llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.

    Returns:
        dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
              que contiene el total de pies lineales calculados.
    """
    headers = {
        "access-token": self.token,
        "Content-Type": "application/json"
    }
    tasks = []

    async with httpx.AsyncClient() as client:
        for primus_result in primus_results:
            bol_number = str(primus_result["BOLNumber"])
            lf_in = build_linear_feet_obj(primus_result, llm_results)

            if not lf_in:
                logger.warning(f"No hay data LLM para BOL {bol_number}, se omite")
                continue

            tasks.append(
                self._post_linear_feet(client, bol_number, lf_in, headers)
            )

        results = await asyncio.gather(*tasks, return_exceptions=False)

    return {bol: lf_out for bol, lf_out in results}

Functions

Email Service

Servicio de utilidad para el envío de notificaciones por correo electrónico, principalmente en caso de errores durante el proceso.

app.services.send_email

app.services.send_email

Classes

EmailService

Servicio para el envío de correos electrónicos utilizando SendGrid.

Esta clase encapsula la lógica para enviar correos electrónicos, especialmente para notificaciones de errores, utilizando una plantilla dinámica.

Source code in app/services/send_email.py
class EmailService:
    """
    Servicio para el envío de correos electrónicos utilizando SendGrid.

    Esta clase encapsula la lógica para enviar correos electrónicos, 
    especialmente para notificaciones de errores, utilizando una plantilla dinámica.
    """
    def __init__(self):
        """
        Inicializa el cliente de SendGrid.

        Args:
            No recibe argumentos.

        Returns:
            No retorna ningún valor.
        """
        self.sg_client = SendGridAPIClient(settings.sendgrid_api_key)

    def send_email(
        self, *, dynamic_data: dict
    ):
        """
        Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.

        Args:
            dynamic_data (dict): Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo.
                                 Debe contener al menos una clave "subject" si se desea un asunto personalizado,
                                 de lo contrario, se usará un asunto predeterminado.

        Returns:
            None: Este método no retorna ningún valor explícito. Registra un error si el envío falla.
        """
        sg_message = Mail(
            from_email=settings.sendgrid_from,
            to_emails=settings.sendgrid_to,
        )

        sg_message.template_id = settings.sendgrid_id_template

        sg_message.dynamic_template_data = {
            "subject": "Primus load creation failed",
            **dynamic_data,
        }

        try:
            self.sg_client.send(sg_message)
        except Exception as e:
            logger.error(f"[ERROR] enviando email de notificación: {str(e)}")
            return None
Functions
__init__()

Inicializa el cliente de SendGrid.

Returns:

Type Description

No retorna ningún valor.

Source code in app/services/send_email.py
def __init__(self):
    """
    Inicializa el cliente de SendGrid.

    Args:
        No recibe argumentos.

    Returns:
        No retorna ningún valor.
    """
    self.sg_client = SendGridAPIClient(settings.sendgrid_api_key)
send_email(*, dynamic_data)

Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.

Parameters:

Name Type Description Default
dynamic_data dict

Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo. Debe contener al menos una clave "subject" si se desea un asunto personalizado, de lo contrario, se usará un asunto predeterminado.

required

Returns:

Name Type Description
None

Este método no retorna ningún valor explícito. Registra un error si el envío falla.

Source code in app/services/send_email.py
def send_email(
    self, *, dynamic_data: dict
):
    """
    Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.

    Args:
        dynamic_data (dict): Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo.
                             Debe contener al menos una clave "subject" si se desea un asunto personalizado,
                             de lo contrario, se usará un asunto predeterminado.

    Returns:
        None: Este método no retorna ningún valor explícito. Registra un error si el envío falla.
    """
    sg_message = Mail(
        from_email=settings.sendgrid_from,
        to_emails=settings.sendgrid_to,
    )

    sg_message.template_id = settings.sendgrid_id_template

    sg_message.dynamic_template_data = {
        "subject": "Primus load creation failed",
        **dynamic_data,
    }

    try:
        self.sg_client.send(sg_message)
    except Exception as e:
        logger.error(f"[ERROR] enviando email de notificación: {str(e)}")
        return None