API Reference
En esta sección se detalla la documentación del código fuente de la aplicación, extraída directamente de los docstrings. La documentación está organizada siguiendo el flujo de ejecución del procesamiento de los Bills of Lading (BOLs).
Scheduler
El proceso puede ser iniciado automáticamente por un planificador de tareas (scheduler) que se ejecuta a intervalos regulares.
app.scheduler.jobs
Este módulo se encarga de configurar y ejecutar la tarea programada que llama al endpoint de procesamiento de BOLs.
app.scheduler.jobs
Functions
call_process_bol()
async
Realiza una llamada HTTP POST al endpoint de procesamiento de BOLs.
Esta función asíncrona construye la URL y la fecha de inicio para la solicitud,
y luego envía una petición POST al servicio process_bol.
Returns:
| Name | Type | Description |
|---|---|---|
None |
La función no retorna un valor explícito, pero registra el resultado de la llamada HTTP. |
Source code in app/scheduler/jobs.py
async def call_process_bol():
"""
Realiza una llamada HTTP POST al endpoint de procesamiento de BOLs.
Esta función asíncrona construye la URL y la fecha de inicio para la solicitud,
y luego envía una petición POST al servicio `process_bol`.
Args:
No recibe argumentos.
Returns:
None: La función no retorna un valor explícito, pero registra el resultado de la llamada HTTP.
"""
url = settings.process_bol_url
timeout = settings.http_client_timeout_seconds
start_date = datetime.now(ZoneInfo(settings.timezone)).isoformat()
async with httpx.AsyncClient(timeout=timeout) as client:
r = await client.post(url, json={"start_date": start_date})
logger.info(
"[Scheduler] Called process_bol with start_date=%s -> %s: %s",
start_date, r.status_code, r.text
)
start_scheduler()
Inicia el planificador de tareas (scheduler) para ejecutar call_process_bol a intervalos regulares.
Configura un AsyncIOScheduler que ejecutará la función call_process_bol
cada cierto número de minutos, definido en las configuraciones de la aplicación.
Returns:
| Name | Type | Description |
|---|---|---|
AsyncIOScheduler |
AsyncIOScheduler
|
Una instancia del planificador de tareas iniciado. |
Source code in app/scheduler/jobs.py
def start_scheduler() -> AsyncIOScheduler:
"""
Inicia el planificador de tareas (scheduler) para ejecutar `call_process_bol` a intervalos regulares.
Configura un `AsyncIOScheduler` que ejecutará la función `call_process_bol`
cada cierto número de minutos, definido en las configuraciones de la aplicación.
Args:
No recibe argumentos.
Returns:
AsyncIOScheduler: Una instancia del planificador de tareas iniciado.
"""
scheduler = AsyncIOScheduler(timezone=settings.timezone)
scheduler.add_job(call_process_bol, IntervalTrigger(minutes=settings.internal_trigger_minutes))
scheduler.start()
logger.info("Scheduler started")
return scheduler
API Endpoint
El endpoint principal que orquesta todo el flujo de procesamiento de BOLs.
app.api.process
Este módulo contiene el endpoint /fetch-dataload que recibe la solicitud de procesamiento.
app.api.process
Functions
fetch_dataload(request, body)
async
Procesa los Bills of Lading (BOLs) para crear y actualizar cargas (loads).
Este endpoint orquesta un flujo de varios pasos que comienza con la recuperación de archivos adjuntos (BOLs en formato PDF) desde una cuenta de correo electrónico a partir de una fecha determinada. Posteriormente, consulta información adicional de los BOLs en un sistema externo (Primus), procesa los PDFs utilizando un agente multimodal para extraer datos clave, y enriquece la información con datos de un CRM. Finalmente, crea y actualiza los registros de carga (loads) en el CRM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
Request
|
El objeto de solicitud de FastAPI, que contiene el estado de la aplicación (clientes de servicios, etc.). |
required |
body
|
StartDateRequest
|
El cuerpo de la solicitud, que debe contener la fecha de inicio para filtrar los correos electrónicos. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
JSONResponse |
Una respuesta JSON que contiene los IDs de las cargas (loads) creadas en el CRM, con un código de estado 200 en caso de éxito. Si no se encuentran archivos adjuntos, retorna una lista vacía. |
Source code in app/api/process.py
@router.post(
"/fetch-dataload",
response_class=JSONResponse,
response_model=List[FinalLoad],
status_code=200,
responses={200: {"description": "Loads fetched successfully"}},
)
async def fetch_dataload(
request: Request,
body: StartDateRequest
):
"""
Procesa los Bills of Lading (BOLs) para crear y actualizar cargas (loads).
Este endpoint orquesta un flujo de varios pasos que comienza con la recuperación de
archivos adjuntos (BOLs en formato PDF) desde una cuenta de correo electrónico a partir
de una fecha determinada. Posteriormente, consulta información adicional de los BOLs
en un sistema externo (Primus), procesa los PDFs utilizando un agente multimodal
para extraer datos clave, y enriquece la información con datos de un CRM.
Finalmente, crea y actualiza los registros de carga (loads) en el CRM.
Args:
request (Request): El objeto de solicitud de FastAPI, que contiene el estado de la
aplicación (clientes de servicios, etc.).
body (StartDateRequest): El cuerpo de la solicitud, que debe contener la fecha
de inicio para filtrar los correos electrónicos.
Returns:
JSONResponse: Una respuesta JSON que contiene los IDs de las cargas (loads) creadas
en el CRM, con un código de estado 200 en caso de éxito. Si no se
encuentran archivos adjuntos, retorna una lista vacía.
"""
t0 = time.time()
state: AppState = request.app.state
# clientes inicializados en startup_event
mail_client = state.mail_client
primus_client = state.primus_client
crm_client = state.crm_client
megatron_client = state.megatron_client
start_dt = datetime.fromisoformat(body.start_date)
# 1. Traer adjuntos desde correo
attachments = await mail_client.fetch_bol_attachments(start_dt, limit=10)
if not attachments:
logger.warning("No attachments found for start_date=%s", start_dt)
return JSONResponse(
status_code=200,
content=jsonable_encoder([]),
)
bol_codes, filtered_attach = mail_client.extract_bol_codes(attachments)
logger.info(f"{len(bol_codes)} BOLs were found: {bol_codes}")
# 2. Consultar booking data en Primus
primus_data = await primus_client.get_booking_data(bol_codes)
primus_data = primus_data.get("data", {}).get("results", [])
primus_bols = [str(item["BOLNumber"]) for item in primus_data]
remains = set(bol_codes) - set(primus_bols) # identificar BOLs sin data en Primus
if remains:
logger.warning(f"No data found in Primus for BOLs: {remains}")
for bol_number in remains:
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"No data found in Primus for BOL {bol_number}"
}
)
# 3. Procesar cada BOL con el agente multimodal
llm_results = await process_pdfs(
agent=multimodal_agent,
prompt=prompts.PROMPT_BOL,
attachments=filtered_attach,
response_schema=response_schema.SCHEMA_BOL_EXTRACTION,
)
primus_data = map_units(primus_data,llm_results) # mapear unidades de peso y dimension si es necesario
# 4. Integrar resultados de CRM (Accounts)
accounts_results = await crm_client.create_accounts_id(primus_data)
customer_mode_id = await crm_client.get_customer_and_mode_id(primus_data)
accessorials_results = map_accesorials_to_glt(primus_data, llm_results, MAPPER_ACCESORIALS_DATA)
# 5. Integrar resultados de CRM (Contacts)
contacts_results = await crm_client.create_contacts_id(primus_data, accounts_results)
# 6. Integrar resultados de Megatron (linear feet)
linear_feet_by_bol = await megatron_client.calculate_linear_feet_per_bol(primus_data, llm_results)
# 7. Create loads
loads = build_final_loads(
primus_data=primus_data,
llm_results=llm_results,
accounts_results=accounts_results,
contacts_results=contacts_results,
linear_feet_by_bol=linear_feet_by_bol,
customer_mode_id=customer_mode_id,
accessorials_results=accessorials_results,
)
loads_id = await crm_client.create_loads(loads)
# 8. Update loads (carrier & customer quotes) -> NOTE Aquí ya se actualiza el carrier_bill_to
carrier_services = await crm_client.carrier_service(primus_data)
await crm_client.create_quotes(primus_data, llm_results, loads_id, carrier_services)
carrier_bill_ids = await crm_client.create_carrier_bill_id(llm_results)
# 9. Actualizar datos en los loads.
updates_data = {
f"{item['BOLNumber']}": {
"po_number": ' - '.join([
item.get("thirdParty", {}).get("referenceNumber", ""),
item.get("consignee", {}).get("referenceNumber", "")
]),
"pro_number": item.get("vendor",{}).get("PRO",""),
**(
{"mode_id": "a0k1I00000B6pAYQAZ"}
if SCACS_MAPPING.get(
item.get("vendor",{}).get("SCAC",""),
item.get("vendor",{}).get("SCAC","")) == "FDEG"
else {}
)
}
for item in primus_data
}
for bol_code, values in updates_data.items():
bill_to_third_party = llm_results.get(bol_code,{}).get('parsed',{}).get("load",{}).get("bill_to_third_party")
values["external_tms_integration"] = True
values["carrier_id"] = carrier_services.get(bol_code,{}).get("carrier","")
if bol_code in carrier_bill_ids:
values["carrier_bill_id"] = carrier_bill_ids.get(bol_code,{}).get("id","")
if bill_to_third_party:
if "BASINGEN GROUP" in bill_to_third_party:
values["document_options_id"] = DOCUMENT_OPTIONS.get("BASINGEN GROUP","")
else:
values["document_options_id"] = DOCUMENT_OPTIONS.get("GLT","")
await crm_client.update_loads(loads_id, updates_data) # UPDATE LOAD
elapsed = time.time() - t0
logger.info(f"Processed {len(bol_codes)} BOLs in {elapsed:.2f} seconds")
return JSONResponse(
status_code=200,
content=loads_id,
)
Servicios
A continuación se detallan los diferentes servicios y clientes que son utilizados por el endpoint principal para llevar a cabo el procesamiento.
Mail Client
Este servicio se conecta a la API de correo para obtener los correos electrónicos y extraer los archivos adjuntos (PDFs de los BOLs).
app.services.mail_client
app.services.mail_client
Classes
MailClient
Cliente para interactuar con la API interna de correo.
Esta clase gestiona la comunicación con un servicio de correo para obtener correos electrónicos y sus archivos adjuntos, aplicando filtros específicos para los Bills of Lading (BOLs). También implementa un mecanismo para evitar el procesamiento duplicado de correos ya vistos.
Source code in app/services/mail_client.py
class MailClient:
"""
Cliente para interactuar con la API interna de correo.
Esta clase gestiona la comunicación con un servicio de correo para obtener
correos electrónicos y sus archivos adjuntos, aplicando filtros específicos
para los Bills of Lading (BOLs). También implementa un mecanismo para evitar
el procesamiento duplicado de correos ya vistos.
"""
def __init__(self):
"""
Inicializa el cliente de correo.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.mail_api_url
self.subject_filter = settings.mail_filter_subject
self.from_filter = settings.mail_filter_from
self.local_tz = ZoneInfo(settings.timezone)
self.lookback_minutes = settings.lookback_minutes
self.timeout = settings.http_client_timeout_seconds
# Guardamos {graph_id: timestamp_visto}
self._seen_ids: dict[str, datetime] = {}
self.ttl_hours = 24 # caducidad de 24h
def _cleanup_seen_ids(self):
"""
Limpia los IDs de correos que han superado su tiempo de vida (TTL).
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
cutoff = datetime.now(timezone.utc) - timedelta(hours=self.ttl_hours)
expired = [gid for gid, ts in self._seen_ids.items() if ts < cutoff]
for gid in expired:
del self._seen_ids[gid]
def _mark_seen(self, graph_id: str):
"""
Registra un ID de correo como "visto".
Args:
graph_id (str): El ID único del correo a registrar.
Returns:
No retorna ningún valor.
"""
self._seen_ids[graph_id] = datetime.now(timezone.utc)
def _is_seen(self, graph_id: str) -> bool:
"""
Verifica si un ID de correo ya ha sido procesado recientemente.
Args:
graph_id (str): El ID único del correo a verificar.
Returns:
bool: True si el ID ya ha sido visto, False en caso contrario.
"""
return graph_id in self._seen_ids
def extract_bol_codes(self, attachments: list[dict]):
"""
Extrae los códigos de BOL de una lista de archivos adjuntos.
Utiliza una expresión regular para encontrar archivos adjuntos que coincidan
con el patrón "BOL_XXXX.pdf" y extrae el código numérico.
Args:
attachments (list[dict]): Una lista de diccionarios de archivos adjuntos.
Returns:
tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos
de BOL extraídos y una lista de los archivos
adjuntos que coincidieron con el patrón.
"""
pattern = re.compile(r"BOL[_-](\d+)\.pdf$", re.IGNORECASE)
codes = []
filtered_attachments = []
for att in attachments:
name = att.get("name", "")
match = pattern.search(name)
if match:
codes.append(match.group(1))
filtered_attachments.append(att)
return codes, filtered_attachments
async def fetch_bol_attachments(self, start_date: datetime, limit: int = 10) -> list[dict]:
"""
Recupera adjuntos de correos que contienen Bills of Lading (BOL).
Consulta el endpoint `/filtered-messages` de la API de correo para obtener
mensajes que coincidan con los filtros configurados (asunto y remitente).
Luego, para cada correo, obtiene sus archivos adjuntos.
Args:
start_date (datetime): La fecha y hora de inicio para filtrar los correos.
limit (int, optional): El número máximo de correos a procesar.
Defaults to 10.
Returns:
list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario
contiene el nombre ('name') y el contenido ('content') del adjunto.
"""
# limpiar viejos ids antes de cada consulta
self._cleanup_seen_ids()
# asegurar que start_date tiene zona horaria local
start_date = start_date.replace(tzinfo=self.local_tz)
# convertir a UTC después de ajustar el lookback
utc_start_date = (start_date - timedelta(minutes=self.lookback_minutes)).astimezone(timezone.utc)
logger.info("[Graph] Called GRAPH start_date=%s", utc_start_date.isoformat())
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(
f"{self.base_url}/api/graph/filtered-messages",
params={
"start_date": utc_start_date.isoformat(), # Graph espera UTC
"limit": limit,
"subject_filter": self.subject_filter,
"from_filter": self.from_filter,
},
)
resp.raise_for_status()
mails = resp.json()
att_data = []
for mail in mails:
graph_id = mail.get("graph_id")
if not graph_id:
continue
# ignorar si ya fue procesado
if self._is_seen(graph_id):
continue
att_resp = await client.get(f"{self.base_url}/api/graph/v2/attachments/{graph_id}")
att_resp.raise_for_status()
attachments = att_resp.json()
if attachments:
att_data.extend(attachments)
self._mark_seen(graph_id)
return att_data
Functions
__init__()
Inicializa el cliente de correo.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/mail_client.py
def __init__(self):
"""
Inicializa el cliente de correo.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.mail_api_url
self.subject_filter = settings.mail_filter_subject
self.from_filter = settings.mail_filter_from
self.local_tz = ZoneInfo(settings.timezone)
self.lookback_minutes = settings.lookback_minutes
self.timeout = settings.http_client_timeout_seconds
# Guardamos {graph_id: timestamp_visto}
self._seen_ids: dict[str, datetime] = {}
self.ttl_hours = 24 # caducidad de 24h
extract_bol_codes(attachments)
Extrae los códigos de BOL de una lista de archivos adjuntos.
Utiliza una expresión regular para encontrar archivos adjuntos que coincidan con el patrón "BOL_XXXX.pdf" y extrae el código numérico.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attachments
|
list[dict]
|
Una lista de diccionarios de archivos adjuntos. |
required |
Returns:
| Type | Description |
|---|---|
|
tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos de BOL extraídos y una lista de los archivos adjuntos que coincidieron con el patrón. |
Source code in app/services/mail_client.py
def extract_bol_codes(self, attachments: list[dict]):
"""
Extrae los códigos de BOL de una lista de archivos adjuntos.
Utiliza una expresión regular para encontrar archivos adjuntos que coincidan
con el patrón "BOL_XXXX.pdf" y extrae el código numérico.
Args:
attachments (list[dict]): Una lista de diccionarios de archivos adjuntos.
Returns:
tuple[list[str], list[dict]]: Una tupla que contiene una lista de códigos
de BOL extraídos y una lista de los archivos
adjuntos que coincidieron con el patrón.
"""
pattern = re.compile(r"BOL[_-](\d+)\.pdf$", re.IGNORECASE)
codes = []
filtered_attachments = []
for att in attachments:
name = att.get("name", "")
match = pattern.search(name)
if match:
codes.append(match.group(1))
filtered_attachments.append(att)
return codes, filtered_attachments
fetch_bol_attachments(start_date, limit=10)
async
Recupera adjuntos de correos que contienen Bills of Lading (BOL).
Consulta el endpoint /filtered-messages de la API de correo para obtener
mensajes que coincidan con los filtros configurados (asunto y remitente).
Luego, para cada correo, obtiene sus archivos adjuntos.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_date
|
datetime
|
La fecha y hora de inicio para filtrar los correos. |
required |
limit
|
int
|
El número máximo de correos a procesar. Defaults to 10. |
10
|
Returns:
| Type | Description |
|---|---|
list[dict]
|
list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario contiene el nombre ('name') y el contenido ('content') del adjunto. |
Source code in app/services/mail_client.py
async def fetch_bol_attachments(self, start_date: datetime, limit: int = 10) -> list[dict]:
"""
Recupera adjuntos de correos que contienen Bills of Lading (BOL).
Consulta el endpoint `/filtered-messages` de la API de correo para obtener
mensajes que coincidan con los filtros configurados (asunto y remitente).
Luego, para cada correo, obtiene sus archivos adjuntos.
Args:
start_date (datetime): La fecha y hora de inicio para filtrar los correos.
limit (int, optional): El número máximo de correos a procesar.
Defaults to 10.
Returns:
list[dict]: Una lista de diccionarios de archivos adjuntos. Cada diccionario
contiene el nombre ('name') y el contenido ('content') del adjunto.
"""
# limpiar viejos ids antes de cada consulta
self._cleanup_seen_ids()
# asegurar que start_date tiene zona horaria local
start_date = start_date.replace(tzinfo=self.local_tz)
# convertir a UTC después de ajustar el lookback
utc_start_date = (start_date - timedelta(minutes=self.lookback_minutes)).astimezone(timezone.utc)
logger.info("[Graph] Called GRAPH start_date=%s", utc_start_date.isoformat())
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(
f"{self.base_url}/api/graph/filtered-messages",
params={
"start_date": utc_start_date.isoformat(), # Graph espera UTC
"limit": limit,
"subject_filter": self.subject_filter,
"from_filter": self.from_filter,
},
)
resp.raise_for_status()
mails = resp.json()
att_data = []
for mail in mails:
graph_id = mail.get("graph_id")
if not graph_id:
continue
# ignorar si ya fue procesado
if self._is_seen(graph_id):
continue
att_resp = await client.get(f"{self.base_url}/api/graph/v2/attachments/{graph_id}")
att_resp.raise_for_status()
attachments = att_resp.json()
if attachments:
att_data.extend(attachments)
self._mark_seen(graph_id)
return att_data
Primus Client
Cliente utilizado para conectarse a la API de Primus y obtener la información de las reservas (bookings) asociada a los BOLs.
app.services.primus_client
app.services.primus_client
Classes
PrimusClient
Cliente para interactuar con la API de Primus.
Esta clase maneja la autenticación y las solicitudes a la API de Primus para obtener datos de reservas (bookings).
Source code in app/services/primus_client.py
class PrimusClient:
"""
Cliente para interactuar con la API de Primus.
Esta clase maneja la autenticación y las solicitudes a la API de Primus
para obtener datos de reservas (bookings).
"""
def __init__(self):
"""
Inicializa el cliente de Primus.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.primus_base_url
self.user = settings.primus_user
self.password = settings.primus_password
self.timeout = settings.http_client_timeout_seconds
self.token = None
async def login(self):
"""
Autentica con la API de Primus y almacena el token de acceso.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor, pero almacena el token de acceso en self.token.
Levanta una excepción si la autenticación falla.
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
f"{self.base_url}/login",
json={"username": self.user, "password": self.password}
)
resp.raise_for_status()
data = resp.json()
self.token = data.get("data", {}).get("accessToken")
async def get_booking_data(self, bol_numbers: list[str]) -> dict:
"""
Recupera la información de la reserva para una lista de números de BOL de Primus.
Args:
bol_numbers (list[str]): Una lista de números de BOL (Bill of Lading) para consultar.
Returns:
dict: Un diccionario con los datos de la reserva obtenidos de la API.
Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud.
Levanta una excepción si la solicitud a la API falla.
"""
if not self.token:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(
f"{self.base_url}/book",
headers=headers,
params={
"BOLNumber[]": bol_numbers
}
)
if resp.status_code == 401: # Token expirado
await self.login()
return await self.get_booking_data(bol_numbers)
resp.raise_for_status()
return resp.json()
Functions
__init__()
Inicializa el cliente de Primus.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/primus_client.py
def __init__(self):
"""
Inicializa el cliente de Primus.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.primus_base_url
self.user = settings.primus_user
self.password = settings.primus_password
self.timeout = settings.http_client_timeout_seconds
self.token = None
get_booking_data(bol_numbers)
async
Recupera la información de la reserva para una lista de números de BOL de Primus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bol_numbers
|
list[str]
|
Una lista de números de BOL (Bill of Lading) para consultar. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Un diccionario con los datos de la reserva obtenidos de la API. Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud. |
dict
|
Levanta una excepción si la solicitud a la API falla. |
Source code in app/services/primus_client.py
async def get_booking_data(self, bol_numbers: list[str]) -> dict:
"""
Recupera la información de la reserva para una lista de números de BOL de Primus.
Args:
bol_numbers (list[str]): Una lista de números de BOL (Bill of Lading) para consultar.
Returns:
dict: Un diccionario con los datos de la reserva obtenidos de la API.
Si el token ha expirado, intenta autenticarse de nuevo y reintentar la solicitud.
Levanta una excepción si la solicitud a la API falla.
"""
if not self.token:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(
f"{self.base_url}/book",
headers=headers,
params={
"BOLNumber[]": bol_numbers
}
)
if resp.status_code == 401: # Token expirado
await self.login()
return await self.get_booking_data(bol_numbers)
resp.raise_for_status()
return resp.json()
login()
async
Autentica con la API de Primus y almacena el token de acceso.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor, pero almacena el token de acceso en self.token. |
|
|
Levanta una excepción si la autenticación falla. |
Source code in app/services/primus_client.py
async def login(self):
"""
Autentica con la API de Primus y almacena el token de acceso.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor, pero almacena el token de acceso en self.token.
Levanta una excepción si la autenticación falla.
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
f"{self.base_url}/login",
json={"username": self.user, "password": self.password}
)
resp.raise_for_status()
data = resp.json()
self.token = data.get("data", {}).get("accessToken")
Agente de IA (PDF Processing)
Este conjunto de módulos se encarga de procesar los archivos PDF de los BOLs utilizando un modelo de IA generativa (Gemini) para extraer la información estructurada.
app.services.agent.process_pdf
Este módulo orquesta el procesamiento en paralelo de los PDFs, gestionando la concurrencia y los reintentos.
app.services.agent.process_pdf
Classes
Functions
process_pdfs(agent, prompt, attachments, response_schema)
async
Procesa una lista de archivos PDF en paralelo utilizando el LLM.
Crea y ejecuta tareas asíncronas para cada archivo adjunto, gestionando la concurrencia. Recopila los resultados y maneja las excepciones de forma individual para cada tarea, asegurando que el fallo de un PDF no detenga el procesamiento de los demás.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
GenaiProcessor
|
La instancia del procesador Genai a utilizar. |
required |
prompt
|
str
|
El prompt a enviar al LLM para cada PDF. |
required |
attachments
|
list[dict]
|
Una lista de diccionarios, donde cada uno representa un archivo PDF a procesar. |
required |
response_schema
|
dict
|
El esquema JSON que el LLM debe seguir para las respuestas. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, dict]
|
dict[str, dict]: Un diccionario consolidado con los resultados exitosos, donde cada clave es un número de BOL. Los procesamientos fallidos se registran como errores y no se incluyen en el retorno. |
Source code in app/services/agent/process_pdf.py
async def process_pdfs(
agent: GenaiProcessor,
prompt: str,
attachments: list[dict],
response_schema: dict,
) -> dict[str, dict]:
"""
Procesa una lista de archivos PDF en paralelo utilizando el LLM.
Crea y ejecuta tareas asíncronas para cada archivo adjunto, gestionando la
concurrencia. Recopila los resultados y maneja las excepciones de forma
individual para cada tarea, asegurando que el fallo de un PDF no detenga
el procesamiento de los demás.
Args:
agent (GenaiProcessor): La instancia del procesador Genai a utilizar.
prompt (str): El prompt a enviar al LLM para cada PDF.
attachments (list[dict]): Una lista de diccionarios, donde cada uno
representa un archivo PDF a procesar.
response_schema (dict): El esquema JSON que el LLM debe seguir para
las respuestas.
Returns:
dict[str, dict]: Un diccionario consolidado con los resultados exitosos,
donde cada clave es un número de BOL. Los procesamientos fallidos
se registran como errores y no se incluyen en el retorno.
"""
tasks = [
_call_llm_threadsafe(agent, prompt, att, response_schema)
for att in attachments
]
results = await asyncio.gather(*tasks, return_exceptions=True)
final_results: dict[str, dict] = {}
for i, r in enumerate(results):
if isinstance(r, Exception):
logger.error("Falló el procesamiento del PDF %d: %s", i, r)
continue
# r tiene la forma {"BOL123": {"parsed": ..., "usage": ...}}
final_results.update(r)
return final_results
retry_policy()
Define y retorna una política de reintentos para manejar errores transitorios.
Utiliza una estrategia de backoff exponencial, reintentando hasta 5 veces
cuando se producen errores específicos como RateLimitError, APIError de Google,
o HTTPStatusError.
Returns:
| Name | Type | Description |
|---|---|---|
Callable |
Callable
|
Un decorador de reintentos de la librería |
Source code in app/services/agent/process_pdf.py
def retry_policy() -> Callable:
"""
Define y retorna una política de reintentos para manejar errores transitorios.
Utiliza una estrategia de backoff exponencial, reintentando hasta 5 veces
cuando se producen errores específicos como `RateLimitError`, `APIError` de Google,
o `HTTPStatusError`.
Returns:
Callable: Un decorador de reintentos de la librería `tenacity`.
"""
return retry(
wait=wait_exponential(multiplier=1, min=2, max=30), # 2s, 4s, 8s... hasta 30s
stop=stop_after_attempt(5), # máximo 5 intentos
retry=retry_if_exception_type((RateLimitError, APIError, HTTPStatusError)),
reraise=True,
)
safe_json_load(response, bol_number)
Parsea una cadena JSON de forma segura, manejando posibles errores.
Si la cadena no es un JSON válido, registra un error, envía un correo de notificación y retorna una estructura de respuesta por defecto.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
str
|
La cadena de texto que se espera que sea un JSON. |
required |
bol_number
|
str
|
El número de BOL asociado a la respuesta, para incluirlo en notificaciones de error. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
El objeto JSON parseado, o un diccionario por defecto si ocurre un error. |
Source code in app/services/agent/process_pdf.py
def safe_json_load(response: str, bol_number:str) -> dict:
"""
Parsea una cadena JSON de forma segura, manejando posibles errores.
Si la cadena no es un JSON válido, registra un error, envía un correo
de notificación y retorna una estructura de respuesta por defecto.
Args:
response (str): La cadena de texto que se espera que sea un JSON.
bol_number (str): El número de BOL asociado a la respuesta, para
incluirlo en notificaciones de error.
Returns:
dict: El objeto JSON parseado, o un diccionario por defecto si ocurre un error.
"""
try:
return json.loads(response)
except (json.JSONDecodeError, TypeError):
logger.error("Error parsing JSON response from LLM")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"[ERROR] Respuesta invalida LLM para BOL: {bol_number}"
}
)
return DEFAULT_LLM_RESPONSE.model_dump()
app.services.agent.genai_client
Este cliente encapsula la comunicación con la API del modelo de IA generativa.
app.services.agent.genai_client
Classes
GenaiProcessor
Clase para interactuar con los modelos de IA generativa de Google a través del SDK de Genai.
Esta clase encapsula la configuración y la comunicación con los modelos de Vertex AI, proporcionando métodos para generar contenido a partir de URIs, bytes de archivos, y para manejar sesiones de chat.
Source code in app/services/agent/genai_client.py
class GenaiProcessor:
"""
Clase para interactuar con los modelos de IA generativa de Google a través del SDK de Genai.
Esta clase encapsula la configuración y la comunicación con los modelos de Vertex AI,
proporcionando métodos para generar contenido a partir de URIs, bytes de archivos,
y para manejar sesiones de chat.
"""
def __init__(
self,
model_name: str,
system_instruction: str | None = None,
) -> None:
"""
Inicializa el cliente de GenaiProcessor.
Args:
model_name (str): El nombre del modelo a utilizar (ej. "gemini-2.5-pro").
system_instruction (str | None, optional): Instrucción de sistema para
guiar el comportamiento del modelo. Defaults to None.
"""
self.client = genai.Client(
vertexai=True,
project=os.environ["VERTEX_PROJECT_ID"],
location=os.environ["VERTEX_LOCATION"],
)
self.model_name = model_name
self.system_instruction = system_instruction
self._thinking_models = {"gemini-2.5-flash", "gemini-2.5-pro"}
def _get_safety_settings(self, threshold: HarmBlockThreshold) -> list[SafetySetting]:
"""
Construye la configuración de seguridad para las solicitudes a la API.
Args:
threshold (HarmBlockThreshold): El nivel de umbral para bloquear contenido
dañino.
Returns:
list[SafetySetting]: Una lista de objetos SafetySetting para cada categoría
de contenido dañino.
"""
return [
SafetySetting(category=category, threshold=threshold)
for category in [
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
HarmCategory.HARM_CATEGORY_HARASSMENT,
HarmCategory.HARM_CATEGORY_HATE_SPEECH,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
]
]
def _normalize_files(
self,
files: list[dict[str, str]] | None,
bucket_path: str | None,
mime_type: str | None,
) -> list[dict[str, str]]:
"""
Normaliza la lista de archivos para la solicitud a la API.
Valida y formatea la lista de archivos. Si `files` es None, la inicializa.
Si `bucket_path` se proporciona y la lista de archivos está vacía, crea
una entrada de archivo para el bucket.
Args:
files (list[dict[str, str]] | None): La lista de archivos a normalizar.
Cada archivo es un dict con 'file_uri' y 'mime_type'.
bucket_path (str | None): La ruta GCS a un archivo.
mime_type (str | None): El tipo MIME del archivo en `bucket_path`.
Returns:
list[dict[str, str]]: La lista de archivos normalizada.
Raises:
ValueError: Si `files` no es una lista de diccionarios o si a un
archivo le falta 'file_uri' or 'mime_type'.
"""
if files is None:
files = []
if not isinstance(files, list) or not all(isinstance(f, dict) for f in files):
raise ValueError("`files` must be a list of dictionaries.")
if not files and bucket_path:
files = [
{
"file_uri": bucket_path,
"mime_type": mime_type or "application/pdf",
},
]
for f in files:
if not f.get("file_uri") or not f.get("mime_type"):
raise ValueError("Each file must include 'file_uri' and 'mime_type'.")
return files
def _build_generation_config(
self,
*,
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> GenerateContentConfig:
"""
Construye el objeto de configuración para la generación de contenido.
Args:
response_schema (dict | None, optional): El esquema para la respuesta. Defaults to None.
response_mime_type (str, optional): El tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): La temperatura de la generación. Defaults to 0.0.
max_output_tokens (int, optional): Máximo de tokens en la respuesta. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Top-p para el muestreo. Defaults to 1.0.
seed (int, optional): Semilla para la reproducibilidad. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Umbral de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
GenerateContentConfig: El objeto de configuración de generación.
"""
config = {
"system_instruction": self.system_instruction,
"response_schema": response_schema,
"response_mime_type": response_mime_type,
"temperature": temperature,
"max_output_tokens": max_output_tokens,
"presence_penalty": presence_penalty,
"frequency_penalty": frequency_penalty,
"top_p": top_p,
"seed": seed,
"safety_settings": self._get_safety_settings(safety_threshold),
}
if self.model_name in self._thinking_models:
config["thinking_config"] = ThinkingConfig(
thinking_budget=thinking_budget,
)
return GenerateContentConfig(**config)
def extract_usage_metadata(self, response: object) -> dict:
"""
Extrae los metadatos de uso de la respuesta de la API.
Args:
response (object): El objeto de respuesta de la API de Genai.
Returns:
dict: Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica).
"""
usage = {
"prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", None),
"response_tokens": getattr(response.usage_metadata, "candidates_token_count", None),
"total_tokens": getattr(response.usage_metadata, "total_token_count", None),
}
if self.model_name in self._thinking_models:
usage["thoughts_tokens"] = getattr(
response.usage_metadata, "thoughts_token_count", None,
)
return usage
def generate_from_uri(
self,
prompt: str,
*,
files: list[dict[str, str]] | None = None,
bucket_path: str | None = None,
mime_type: str | None = None,
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Genera contenido a partir de un prompt y archivos especificados por URI (GCS).
Args:
prompt (str): El prompt de texto para el modelo.
files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
Raises:
VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
"""
files = self._normalize_files(files, bucket_path, mime_type)
contents = [
Part.from_uri(
file_uri=f["file_uri"],
mime_type=f["mime_type"],
) for f in files
] + [
Part.from_text(text=prompt),
]
config = self._build_generation_config(
response_schema=response_schema,
response_mime_type=response_mime_type,
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget
)
try:
response = self.client.models.generate_content(
model=self.model_name,
contents=contents,
config=config,
)
except APIError as e:
raise VertexAIExceptionError(
original_exception=e,
) from e
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
def generate_from_bytes(
self,
prompt: str,
*,
pdf_bytes: bytes,
mime_type: str = "application/pdf",
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Genera contenido a partir de un prompt y los bytes de un archivo PDF.
Args:
prompt (str): El prompt de texto para el modelo.
pdf_bytes (bytes): Los bytes del archivo PDF a procesar.
mime_type (str, optional): El tipo MIME del archivo. Defaults to "application/pdf".
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
"""
contents = [
Part.from_bytes(
data=pdf_bytes,
mime_type = mime_type,
),
Part.from_text(
text=prompt,
),
]
config = {
"system_instruction": self.system_instruction,
"response_mime_type": response_mime_type,
"response_schema": response_schema,
"temperature": temperature,
"max_output_tokens": max_output_tokens,
"presence_penalty": presence_penalty,
"frequency_penalty": frequency_penalty,
"top_p": top_p,
"seed": seed,
"safety_settings": self._get_safety_settings(
safety_threshold,
),
}
if self.model_name in self._thinking_models:
config["thinking_config"] = ThinkingConfig(
thinking_budget=thinking_budget,
)
response = self.client.models.generate_content(
model=self.model_name,
contents=contents,
config=GenerateContentConfig(**config),
)
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
def get_chat_session(
self,
*,
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> object:
"""
Crea y retorna una nueva sesión de chat con la configuración especificada.
Args:
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
object: Un objeto de sesión de chat del SDK de Genai.
"""
config = self._build_generation_config(
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget,
)
return self.client.chats.create(
model=self.model_name,
config=config,
)
def generate_chat(
self,
chat: object,
prompt: str,
*,
files: list[dict[str, str]] | None = None,
bucket_path: str | None = None,
mime_type: str | None = None,
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Envía un mensaje a una sesión de chat existente y obtiene una respuesta.
Args:
chat (object): La sesión de chat activa.
prompt (str): El prompt de texto para el modelo.
files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
Raises:
VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
"""
files = self._normalize_files(files, bucket_path, mime_type)
parts = [
Part.from_text(text=prompt),
] + [
Part.from_uri(
file_uri=f["file_uri"],
mime_type=f["mime_type"],
) for f in files
]
config = self._build_generation_config(
response_schema=response_schema,
response_mime_type=response_mime_type,
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget,
)
try:
response = chat.send_message(message=parts,
config=config)
except APIError as e:
raise VertexAIExceptionError(
original_exception=e,
) from e
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
def parse_response(self, response: object) -> dict:
"""
Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.
Args:
response (object): El objeto de respuesta de la API de Genai.
Returns:
dict: Un diccionario con dos claves, 'thoughts' y 'answers', cada una
conteniendo una lista de las partes correspondientes de la respuesta.
"""
parsed = {
"thoughts": [],
"answers": [],
}
if not response.candidates or not response.candidates[0].content.parts:
return parsed
for part in response.candidates[0].content.parts:
if part.thought:
parsed["thoughts"].append(part.text)
else:
parsed["answers"].append(part.text)
return parsed
Functions
__init__(model_name, system_instruction=None)
Inicializa el cliente de GenaiProcessor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_name
|
str
|
El nombre del modelo a utilizar (ej. "gemini-2.5-pro"). |
required |
system_instruction
|
str | None
|
Instrucción de sistema para guiar el comportamiento del modelo. Defaults to None. |
None
|
Source code in app/services/agent/genai_client.py
def __init__(
self,
model_name: str,
system_instruction: str | None = None,
) -> None:
"""
Inicializa el cliente de GenaiProcessor.
Args:
model_name (str): El nombre del modelo a utilizar (ej. "gemini-2.5-pro").
system_instruction (str | None, optional): Instrucción de sistema para
guiar el comportamiento del modelo. Defaults to None.
"""
self.client = genai.Client(
vertexai=True,
project=os.environ["VERTEX_PROJECT_ID"],
location=os.environ["VERTEX_LOCATION"],
)
self.model_name = model_name
self.system_instruction = system_instruction
self._thinking_models = {"gemini-2.5-flash", "gemini-2.5-pro"}
extract_usage_metadata(response)
Extrae los metadatos de uso de la respuesta de la API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
object
|
El objeto de respuesta de la API de Genai. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica). |
Source code in app/services/agent/genai_client.py
def extract_usage_metadata(self, response: object) -> dict:
"""
Extrae los metadatos de uso de la respuesta de la API.
Args:
response (object): El objeto de respuesta de la API de Genai.
Returns:
dict: Un diccionario con el recuento de tokens (prompt, respuesta, total, y thoughts si aplica).
"""
usage = {
"prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", None),
"response_tokens": getattr(response.usage_metadata, "candidates_token_count", None),
"total_tokens": getattr(response.usage_metadata, "total_token_count", None),
}
if self.model_name in self._thinking_models:
usage["thoughts_tokens"] = getattr(
response.usage_metadata, "thoughts_token_count", None,
)
return usage
generate_chat(chat, prompt, *, files=None, bucket_path=None, mime_type=None, response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)
Envía un mensaje a una sesión de chat existente y obtiene una respuesta.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chat
|
object
|
La sesión de chat activa. |
required |
prompt
|
str
|
El prompt de texto para el modelo. |
required |
files
|
list[dict[str, str]] | None
|
Lista de archivos en GCS. Defaults to None. |
None
|
bucket_path
|
str | None
|
Ruta a un único archivo en GCS. Defaults to None. |
None
|
mime_type
|
str | None
|
Tipo MIME para |
None
|
response_schema
|
dict | None
|
Esquema de respuesta esperado. Defaults to None. |
None
|
response_mime_type
|
str
|
Tipo MIME de la respuesta. Defaults to "application/json". |
'application/json'
|
temperature
|
float
|
Controla la aleatoriedad. Defaults to 0.0. |
0.0
|
max_output_tokens
|
int
|
Límite de tokens de salida. Defaults to 8192. |
8192
|
presence_penalty
|
float
|
Penalización por presencia. Defaults to 0.0. |
0.0
|
frequency_penalty
|
float
|
Penalización por frecuencia. Defaults to 0.0. |
0.0
|
top_p
|
float
|
Muestreo nucleus. Defaults to 1.0. |
1.0
|
seed
|
int
|
Semilla para resultados deterministas. Defaults to 20180507. |
20180507
|
safety_threshold
|
HarmBlockThreshold
|
Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE. |
BLOCK_NONE
|
thinking_budget
|
int
|
Presupuesto para 'thinking'. Defaults to 0. |
0
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada. |
Raises:
| Type | Description |
|---|---|
VertexAIExceptionError
|
Si ocurre un error en la llamada a la API de Genai. |
Source code in app/services/agent/genai_client.py
def generate_chat(
self,
chat: object,
prompt: str,
*,
files: list[dict[str, str]] | None = None,
bucket_path: str | None = None,
mime_type: str | None = None,
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Envía un mensaje a una sesión de chat existente y obtiene una respuesta.
Args:
chat (object): La sesión de chat activa.
prompt (str): El prompt de texto para el modelo.
files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
Raises:
VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
"""
files = self._normalize_files(files, bucket_path, mime_type)
parts = [
Part.from_text(text=prompt),
] + [
Part.from_uri(
file_uri=f["file_uri"],
mime_type=f["mime_type"],
) for f in files
]
config = self._build_generation_config(
response_schema=response_schema,
response_mime_type=response_mime_type,
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget,
)
try:
response = chat.send_message(message=parts,
config=config)
except APIError as e:
raise VertexAIExceptionError(
original_exception=e,
) from e
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
generate_from_bytes(prompt, *, pdf_bytes, mime_type='application/pdf', response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)
Genera contenido a partir de un prompt y los bytes de un archivo PDF.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
El prompt de texto para el modelo. |
required |
pdf_bytes
|
bytes
|
Los bytes del archivo PDF a procesar. |
required |
mime_type
|
str
|
El tipo MIME del archivo. Defaults to "application/pdf". |
'application/pdf'
|
response_schema
|
dict | None
|
Esquema de respuesta esperado. Defaults to None. |
None
|
response_mime_type
|
str
|
Tipo MIME de la respuesta. Defaults to "application/json". |
'application/json'
|
temperature
|
float
|
Controla la aleatoriedad. Defaults to 0.0. |
0.0
|
max_output_tokens
|
int
|
Límite de tokens de salida. Defaults to 8192. |
8192
|
presence_penalty
|
float
|
Penalización por presencia. Defaults to 0.0. |
0.0
|
frequency_penalty
|
float
|
Penalización por frecuencia. Defaults to 0.0. |
0.0
|
top_p
|
float
|
Muestreo nucleus. Defaults to 1.0. |
1.0
|
seed
|
int
|
Semilla para resultados deterministas. Defaults to 20180507. |
20180507
|
safety_threshold
|
HarmBlockThreshold
|
Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE. |
BLOCK_NONE
|
thinking_budget
|
int
|
Presupuesto para 'thinking'. Defaults to 0. |
0
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada. |
Source code in app/services/agent/genai_client.py
def generate_from_bytes(
self,
prompt: str,
*,
pdf_bytes: bytes,
mime_type: str = "application/pdf",
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Genera contenido a partir de un prompt y los bytes de un archivo PDF.
Args:
prompt (str): El prompt de texto para el modelo.
pdf_bytes (bytes): Los bytes del archivo PDF a procesar.
mime_type (str, optional): El tipo MIME del archivo. Defaults to "application/pdf".
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
"""
contents = [
Part.from_bytes(
data=pdf_bytes,
mime_type = mime_type,
),
Part.from_text(
text=prompt,
),
]
config = {
"system_instruction": self.system_instruction,
"response_mime_type": response_mime_type,
"response_schema": response_schema,
"temperature": temperature,
"max_output_tokens": max_output_tokens,
"presence_penalty": presence_penalty,
"frequency_penalty": frequency_penalty,
"top_p": top_p,
"seed": seed,
"safety_settings": self._get_safety_settings(
safety_threshold,
),
}
if self.model_name in self._thinking_models:
config["thinking_config"] = ThinkingConfig(
thinking_budget=thinking_budget,
)
response = self.client.models.generate_content(
model=self.model_name,
contents=contents,
config=GenerateContentConfig(**config),
)
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
generate_from_uri(prompt, *, files=None, bucket_path=None, mime_type=None, response_schema=None, response_mime_type='application/json', temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)
Genera contenido a partir de un prompt y archivos especificados por URI (GCS).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
El prompt de texto para el modelo. |
required |
files
|
list[dict[str, str]] | None
|
Lista de archivos en GCS. Defaults to None. |
None
|
bucket_path
|
str | None
|
Ruta a un único archivo en GCS. Defaults to None. |
None
|
mime_type
|
str | None
|
Tipo MIME para |
None
|
response_schema
|
dict | None
|
Esquema de respuesta esperado. Defaults to None. |
None
|
response_mime_type
|
str
|
Tipo MIME de la respuesta. Defaults to "application/json". |
'application/json'
|
temperature
|
float
|
Controla la aleatoriedad. Defaults to 0.0. |
0.0
|
max_output_tokens
|
int
|
Límite de tokens de salida. Defaults to 8192. |
8192
|
presence_penalty
|
float
|
Penalización por presencia. Defaults to 0.0. |
0.0
|
frequency_penalty
|
float
|
Penalización por frecuencia. Defaults to 0.0. |
0.0
|
top_p
|
float
|
Muestreo nucleus. Defaults to 1.0. |
1.0
|
seed
|
int
|
Semilla para resultados deterministas. Defaults to 20180507. |
20180507
|
safety_threshold
|
HarmBlockThreshold
|
Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE. |
BLOCK_NONE
|
thinking_budget
|
int
|
Presupuesto para 'thinking'. Defaults to 0. |
0
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
dict[str, object]: Un diccionario con la respuesta completa de la API, los metadatos de uso y la respuesta parseada. |
Raises:
| Type | Description |
|---|---|
VertexAIExceptionError
|
Si ocurre un error en la llamada a la API de Genai. |
Source code in app/services/agent/genai_client.py
def generate_from_uri(
self,
prompt: str,
*,
files: list[dict[str, str]] | None = None,
bucket_path: str | None = None,
mime_type: str | None = None,
response_schema: dict | None = None,
response_mime_type: str = "application/json",
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> dict[str, object]:
"""
Genera contenido a partir de un prompt y archivos especificados por URI (GCS).
Args:
prompt (str): El prompt de texto para el modelo.
files (list[dict[str, str]] | None, optional): Lista de archivos en GCS. Defaults to None.
bucket_path (str | None, optional): Ruta a un único archivo en GCS. Defaults to None.
mime_type (str | None, optional): Tipo MIME para `bucket_path`. Defaults to None.
response_schema (dict | None, optional): Esquema de respuesta esperado. Defaults to None.
response_mime_type (str, optional): Tipo MIME de la respuesta. Defaults to "application/json".
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
dict[str, object]: Un diccionario con la respuesta completa de la API,
los metadatos de uso y la respuesta parseada.
Raises:
VertexAIExceptionError: Si ocurre un error en la llamada a la API de Genai.
"""
files = self._normalize_files(files, bucket_path, mime_type)
contents = [
Part.from_uri(
file_uri=f["file_uri"],
mime_type=f["mime_type"],
) for f in files
] + [
Part.from_text(text=prompt),
]
config = self._build_generation_config(
response_schema=response_schema,
response_mime_type=response_mime_type,
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget
)
try:
response = self.client.models.generate_content(
model=self.model_name,
contents=contents,
config=config,
)
except APIError as e:
raise VertexAIExceptionError(
original_exception=e,
) from e
usage = self.extract_usage_metadata(response)
return {
"response": response,
"usage": usage,
"parsed": self.parse_response(response),
}
get_chat_session(*, temperature=0.0, max_output_tokens=8192, presence_penalty=0.0, frequency_penalty=0.0, top_p=1.0, seed=20180507, safety_threshold=HarmBlockThreshold.BLOCK_NONE, thinking_budget=0)
Crea y retorna una nueva sesión de chat con la configuración especificada.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
temperature
|
float
|
Controla la aleatoriedad. Defaults to 0.0. |
0.0
|
max_output_tokens
|
int
|
Límite de tokens de salida. Defaults to 8192. |
8192
|
presence_penalty
|
float
|
Penalización por presencia. Defaults to 0.0. |
0.0
|
frequency_penalty
|
float
|
Penalización por frecuencia. Defaults to 0.0. |
0.0
|
top_p
|
float
|
Muestreo nucleus. Defaults to 1.0. |
1.0
|
seed
|
int
|
Semilla para resultados deterministas. Defaults to 20180507. |
20180507
|
safety_threshold
|
HarmBlockThreshold
|
Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE. |
BLOCK_NONE
|
thinking_budget
|
int
|
Presupuesto para 'thinking'. Defaults to 0. |
0
|
Returns:
| Name | Type | Description |
|---|---|---|
object |
object
|
Un objeto de sesión de chat del SDK de Genai. |
Source code in app/services/agent/genai_client.py
def get_chat_session(
self,
*,
temperature: float = 0.0,
max_output_tokens: int = 8192,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
top_p: float = 1.0,
seed: int = 20180507,
safety_threshold: HarmBlockThreshold = HarmBlockThreshold.BLOCK_NONE,
thinking_budget: int = 0,
) -> object:
"""
Crea y retorna una nueva sesión de chat con la configuración especificada.
Args:
temperature (float, optional): Controla la aleatoriedad. Defaults to 0.0.
max_output_tokens (int, optional): Límite de tokens de salida. Defaults to 8192.
presence_penalty (float, optional): Penalización por presencia. Defaults to 0.0.
frequency_penalty (float, optional): Penalización por frecuencia. Defaults to 0.0.
top_p (float, optional): Muestreo nucleus. Defaults to 1.0.
seed (int, optional): Semilla para resultados deterministas. Defaults to 20180507.
safety_threshold (HarmBlockThreshold, optional): Nivel de seguridad. Defaults to HarmBlockThreshold.BLOCK_NONE.
thinking_budget (int, optional): Presupuesto para 'thinking'. Defaults to 0.
Returns:
object: Un objeto de sesión de chat del SDK de Genai.
"""
config = self._build_generation_config(
temperature=temperature,
max_output_tokens=max_output_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
top_p=top_p,
seed=seed,
safety_threshold=safety_threshold,
thinking_budget=thinking_budget,
)
return self.client.chats.create(
model=self.model_name,
config=config,
)
parse_response(response)
Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
object
|
El objeto de respuesta de la API de Genai. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Un diccionario con dos claves, 'thoughts' y 'answers', cada una conteniendo una lista de las partes correspondientes de la respuesta. |
Source code in app/services/agent/genai_client.py
def parse_response(self, response: object) -> dict:
"""
Parsea la respuesta de la API para separar los 'thoughts' de las 'answers'.
Args:
response (object): El objeto de respuesta de la API de Genai.
Returns:
dict: Un diccionario con dos claves, 'thoughts' y 'answers', cada una
conteniendo una lista de las partes correspondientes de la respuesta.
"""
parsed = {
"thoughts": [],
"answers": [],
}
if not response.candidates or not response.candidates[0].content.parts:
return parsed
for part in response.candidates[0].content.parts:
if part.thought:
parsed["thoughts"].append(part.text)
else:
parsed["answers"].append(part.text)
return parsed
Mapper
Este módulo contiene funciones de utilidad para mapear y transformar datos entre los diferentes formatos y sistemas (Primus, LLM, CRM).
app.services.mapper.mapper
app.services.mapper.mapper
Functions
build_account_obj(primus_result, account_type)
Construye un objeto Accounts para un shipper o consignee.
A partir de un resultado de Primus, esta función crea un objeto Accounts
que representa a un shipper (remitente) o a un consignee (destinatario),
truncando los campos de nombre y calle a 40 caracteres si es necesario.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
account_type
|
str
|
El tipo de cuenta a construir, debe ser 'shipper' o 'consignee'. |
required |
Returns:
| Type | Description |
|---|---|
Optional[Accounts]
|
Optional[Accounts]: Un objeto |
Source code in app/services/mapper/mapper.py
def build_account_obj(primus_result: dict, account_type: str) -> Optional[Accounts]:
"""
Construye un objeto Accounts para un shipper o consignee.
A partir de un resultado de Primus, esta función crea un objeto `Accounts`
que representa a un shipper (remitente) o a un consignee (destinatario),
truncando los campos de nombre y calle a 40 caracteres si es necesario.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
account_type (str): El tipo de cuenta a construir, debe ser 'shipper' o 'consignee'.
Returns:
Optional[Accounts]: Un objeto `Accounts` con la información de la cuenta.
Retorna `None` si el `account_type` no se encuentra en `primus_result`.
"""
if account_type not in primus_result:
return None
account_data = primus_result[account_type]
country_code_2letter = ""
country_code_3letter = account_data.get("country", "")
account_name = account_data.get("name", "")
street = account_data.get("address1", "")
if country_code_3letter == "USA":
country_code_2letter = "US"
elif country_code_3letter == "CAN":
country_code_2letter = "CA"
elif country_code_3letter == "MEX":
country_code_2letter = "MX"
if len(account_name) > 40:
account_name = account_name[:40] # Truncar a 40 caracteres si es necesario
if len(street) > 40:
street = street[:40]
return Accounts(
name=account_name,
street=street,
city=account_data.get("city", ""),
postal_code=account_data.get("zipCode", ""),
country_code=country_code_2letter,
state_code=account_data.get("state", ""),
phone=account_data.get("contactPhone", ""),
)
build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)
Construye un objeto CarrierQuote a partir de los datos de Primus y del LLM.
Crea un objeto CarrierQuote que representa la cotización de un transportista,
determinando el tipo de contrato y extrayendo los detalles relevantes de la
cotización.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
llm_entry
|
dict
|
La entrada correspondiente del procesamiento del LLM. |
required |
load_id
|
str
|
El ID de la carga (load) a la que se asocia la cotización. |
required |
carrier_service
|
dict
|
Un diccionario con la información del servicio del transportista, incluyendo su ID. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
CarrierQuote |
CarrierQuote
|
Un objeto |
Source code in app/services/mapper/mapper.py
def build_carrier_quote_object(primus_result: dict, llm_entry:dict, load_id: str, carrier_service: dict) -> CarrierQuote:
"""
Construye un objeto CarrierQuote a partir de los datos de Primus y del LLM.
Crea un objeto `CarrierQuote` que representa la cotización de un transportista,
determinando el tipo de contrato y extrayendo los detalles relevantes de la
cotización.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
llm_entry (dict): La entrada correspondiente del procesamiento del LLM.
load_id (str): El ID de la carga (load) a la que se asocia la cotización.
carrier_service (dict): Un diccionario con la información del servicio del
transportista, incluyendo su ID.
Returns:
CarrierQuote: Un objeto `CarrierQuote` con los detalles de la cotización.
"""
vendor = primus_result.get("vendor", {})
bill_to_third_party = llm_entry.get('parsed',{}).get("load",{}).get("bill_to_third_party")
if not bill_to_third_party:
contract = ""
elif "BASINGEN" in bill_to_third_party:
contract = "BA"
else:
contract = "GLT"
# service_level = vendor.get("serviceLevel")
# mode = vendor.get("mode")
return CarrierQuote(
carrier_service=carrier_service.get("id",""),
service_class=f'{vendor.get("serviceLevel", "")}', # Sin el service_level, solo el tipo.
line_haul=vendor.get("cost"),
quote_contract_id=f'{vendor.get("SCAC")}_{contract}:{vendor.get("quoteNumber")}',
load_id=load_id,
vendor_id=carrier_service.get("carrier",""),
estimated_transit_time_days=vendor.get("transitDays"),
)
build_contact_obj(primus_result, account_type, account_id)
Construye un objeto Contact para un shipper o consignee.
Extrae la información de contacto de un shipper o consignee desde un resultado
de Primus y la utiliza para crear un objeto Contact.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
account_type
|
str
|
El tipo de cuenta ('shipper' o 'consignee') de la cual extraer el contacto. |
required |
account_id
|
str
|
El ID de la cuenta asociada a este contacto. |
required |
Returns:
| Type | Description |
|---|---|
Optional[Contact]
|
Optional[Contact]: Un objeto |
Source code in app/services/mapper/mapper.py
def build_contact_obj(primus_result: dict, account_type: str, account_id: str) -> Optional[Contact]:
"""
Construye un objeto Contact para un shipper o consignee.
Extrae la información de contacto de un shipper o consignee desde un resultado
de Primus y la utiliza para crear un objeto `Contact`.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
account_type (str): El tipo de cuenta ('shipper' o 'consignee') de la cual
extraer el contacto.
account_id (str): El ID de la cuenta asociada a este contacto.
Returns:
Optional[Contact]: Un objeto `Contact` con la información del contacto.
Retorna `None` si el `account_type` no está en `primus_result` o si
no se encuentra un nombre de contacto.
"""
if account_type not in primus_result:
return None
account_data = primus_result[account_type]
full_name = account_data.get("contact", "")
if not full_name:
return None
name_parts = full_name.split()
first_name = name_parts[0]
email_raw = account_data.get("email", "")
first_email = email_raw.split(";")[0].strip() if email_raw else ""
return Contact(
first_name=first_name,
last_name="Shipping",
phone=account_data.get("phone", ""),
email=first_email,
account_id=account_id
)
build_customer_quote_object(primus_result, load_id)
Construye un objeto CustomerQuote a partir de los datos de Primus.
Crea un objeto CustomerQuote que representa la cotización para el cliente,
extrayendo la información de contabilidad y la fecha de la cotización.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
load_id
|
str
|
El ID de la carga (load) a la que se asocia la cotización. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
CustomerQuote |
CustomerQuote
|
Un objeto |
Source code in app/services/mapper/mapper.py
def build_customer_quote_object(primus_result: dict, load_id: str) -> CustomerQuote:
"""
Construye un objeto CustomerQuote a partir de los datos de Primus.
Crea un objeto `CustomerQuote` que representa la cotización para el cliente,
extrayendo la información de contabilidad y la fecha de la cotización.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
load_id (str): El ID de la carga (load) a la que se asocia la cotización.
Returns:
CustomerQuote: Un objeto `CustomerQuote` con los detalles de la cotización
del cliente.
"""
accounting = primus_result.get("accountingInformation", {})
tracking = primus_result.get("trackingInformation", {})
net_line_haul = str(accounting.get("customerQuoteAmount"))
quote_date = tracking.get("bookDate")
dt = datetime.strptime(quote_date, "%Y-%m-%d %H:%M:%S")
return CustomerQuote(
load_id=load_id,
net_line_haul=float(net_line_haul.replace("*", "").strip()),
pricing="contract",
quote_date=str(dt.date())
)
build_final_loads(primus_data, llm_results, accounts_results, contacts_results, linear_feet_by_bol, customer_mode_id, accessorials_results)
Construye una lista de objetos FinalLoad a partir de datos procesados.
Esta función integra datos de múltiples fuentes (Primus, LLM, cuentas, contactos, etc.)
para construir una lista completa de objetos FinalLoad, cada uno representando
una carga final lista para ser procesada.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_data
|
list[dict]
|
Lista de datos de bookings de Primus. |
required |
llm_results
|
dict[str, dict]
|
Resultados del procesamiento del LLM. |
required |
accounts_results
|
dict
|
Resultados de la creación de cuentas. |
required |
contacts_results
|
dict
|
Resultados de la creación de contactos. |
required |
linear_feet_by_bol
|
dict
|
Datos de 'linear feet' calculados por BOL. |
required |
customer_mode_id
|
dict
|
Mapeo de customer ID y mode ID por BOL. |
required |
accessorials_results
|
dict
|
'Accessorials' mapeados y consolidados por BOL. |
required |
Returns:
| Type | Description |
|---|---|
List[FinalLoad]
|
List[FinalLoad]: Una lista de objetos |
Source code in app/services/mapper/mapper.py
def build_final_loads(
primus_data: list[dict],
llm_results: dict[str,dict],
accounts_results: dict,
contacts_results: dict,
linear_feet_by_bol: dict,
customer_mode_id: dict,
accessorials_results: dict,
) -> List[FinalLoad]:
"""
Construye una lista de objetos FinalLoad a partir de datos procesados.
Esta función integra datos de múltiples fuentes (Primus, LLM, cuentas, contactos, etc.)
para construir una lista completa de objetos `FinalLoad`, cada uno representando
una carga final lista para ser procesada.
Args:
primus_data (list[dict]): Lista de datos de bookings de Primus.
llm_results (dict[str, dict]): Resultados del procesamiento del LLM.
accounts_results (dict): Resultados de la creación de cuentas.
contacts_results (dict): Resultados de la creación de contactos.
linear_feet_by_bol (dict): Datos de 'linear feet' calculados por BOL.
customer_mode_id (dict): Mapeo de customer ID y mode ID por BOL.
accessorials_results (dict): 'Accessorials' mapeados y consolidados por BOL.
Returns:
List[FinalLoad]: Una lista de objetos `FinalLoad` completamente construidos.
"""
results: list[FinalLoad] = []
for bol in primus_data:
bol_code = str(bol["BOLNumber"])
# === load ===
load = Load(
total_weight=float(bol.get("totalWeight")),
customer_id=customer_mode_id.get(bol_code, {}).get("customer_id"),
mode_id=customer_mode_id.get(bol_code, {}).get("mode_id"),
linear_feet=float(linear_feet_by_bol.get(bol_code, {}).get("total_linear_feet")),
order_date=bol.get("trackingInformation", {}).get("pickupDateEstimated"),
)
# === stops ===
stops: list[Stop] = []
# pickup (stop 1)
pickup_info = bol.get('pickupInformation', {})
pickup_time_from = pickup_info.get('timeFrom', '')
pickup_time_to = pickup_info.get('timeTo', '')
if not pickup_time_from or not pickup_time_to:
pickup_hours = "08:00-17:00"
else:
pickup_hours = f"{pickup_time_from[:5]}-{pickup_time_to[:5]}"
stops.append(
Stop(
name="Stop 1",
number=1,
location_id=accounts_results.get("shipper",{}).get(bol_code, {}).get("id"),
contact_info_id=contacts_results.get("shipper",{}).get(bol_code, {}).get("id"),
shipping_receiving_hours=pickup_hours,
expected_date=bol.get("trackingInformation", {}).get("pickupDateEstimated"),
references=bol.get("shipper", {}).get("referenceNumber"),
instructions=bol.get("BOLInstructions"),
pickup_delivery_number=bol.get("vendor").get("pickupNumber",""),
is_pickup=True,
state_province=bol.get("shipper", {}).get("state"),
postal_code=bol.get("shipper", {}).get("zipCode"),
country=bol.get("shipper", {}).get("country"),
city=bol.get("shipper", {}).get("city")
)
)
# delivery (stop 2)
delivery_info = bol.get('deliveryInformation', {})
delivery_time_from = delivery_info.get('timeFrom', '')
delivery_time_to = delivery_info.get('timeTo', '')
if not delivery_time_from or not delivery_time_to:
delivery_hours = "08:00-17:00"
else:
delivery_hours = f"{delivery_time_from[:5]}-{delivery_time_to[:5]}"
stops.append(
Stop(
name="Stop 2",
number=2,
location_id=accounts_results.get("consignee",{}).get(bol_code, {}).get("id"),
contact_info_id=contacts_results.get("consignee",{}).get(bol_code, {}).get("id"),
shipping_receiving_hours=delivery_hours,
expected_date=bol.get("trackingInformation", {}).get("deliveryDateEstimated"),
references=bol.get("consignee", {}).get("referenceNumber"),
instructions=bol.get("BOLInstructions"),
is_dropoff=True,
state_province=bol.get("consignee", {}).get("state"),
postal_code=bol.get("consignee", {}).get("zipCode"),
country=bol.get("consignee", {}).get("country"),
city=bol.get("consignee", {}).get("city"),
)
)
# === line items ===
line_items: list[LineItem] = []
for idx, fi in enumerate(bol.get("freightInfo", [])):
parsed_llm = llm_results.get(bol_code,{}).get("parsed",{})
llm_line_item = get_llm_line_item(parsed_llm, idx)
line_items.append(
LineItem(
name=f"Line Item {idx+1}",
description=fi.get("commodity"),
handling_unit_count=fi.get("qty"),
handling_units=MAPPER_HU_DATA.get(fi.get("dimType")),
height=float(fi.get("height")),
length=float(fi.get("length")),
# linear_feet=linear_feet_by_bol.get(bol_code),
nmfc_class=str(fi.get("class")),
weight=float(fi.get("weight"))*int(fi.get("qty")),
width=float(fi.get("width")),
hazardous_materials=fi.get("hazmat", False),
hazmat_contact=contacts_results.get("hazmat",{}).get(bol_code, {}).get("id") or "",
hazmat_class_division=llm_line_item.get("hazmat_class_division", "") or "",
hazmat_number_type=llm_line_item.get("hazmat_number_type", "") or "",
hazmat_number=fi.get("UN"),
hazmat_packaging_group=llm_line_item.get("hazmat_packaging_group", "") or "",
nmfc_number=fi.get("nmfc"),
packaging_unit_count=llm_line_item.get("packaging_unit_count", 0) or 0,
stackable=llm_line_item.get("stackable", False) or False,
)
)
# === accessorials ===
accessorials: list[Accessorial] = [
Accessorial(name=acc["name"], accessorial_id=acc["accesorial_id"])
for acc in accessorials_results.get(bol_code, {}).get('accessorials',[])
]
# === FinalLoad ===
final_load = FinalLoad(
bol_number=bol_code,
load=load,
stops=stops,
line_items=line_items,
accessorials=accessorials,
)
results.append(final_load)
return results
build_hazmat_contact_obj(primus_result, account_id)
Construye un objeto Contact para emergencias de materiales peligrosos (Hazmat).
Extrae la información de contacto de emergencia desde un resultado de Primus
y crea un objeto Contact específico para Hazmat.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
account_id
|
str
|
El ID de la cuenta asociada a este contacto. |
required |
Returns:
| Type | Description |
|---|---|
Optional[Contact]
|
Optional[Contact]: Un objeto |
Source code in app/services/mapper/mapper.py
def build_hazmat_contact_obj(primus_result: dict, account_id: str) -> Optional[Contact]:
"""
Construye un objeto Contact para emergencias de materiales peligrosos (Hazmat).
Extrae la información de contacto de emergencia desde un resultado de Primus
y crea un objeto `Contact` específico para Hazmat.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
account_id (str): El ID de la cuenta asociada a este contacto.
Returns:
Optional[Contact]: Un objeto `Contact` con la información de contacto de
emergencia. Retorna `None` si no se encuentra el nombre o el teléfono
de contacto de emergencia.
"""
last_name = primus_result.get("emergencyContact", "")
if not last_name:
return None
phone = primus_result.get("emergencyPhone", "")
if not phone:
return None
return Contact(
first_name="Hazmat",
last_name=last_name,
phone=phone,
email="",
account_id=account_id
)
build_linear_feet_obj(primus_result, llm_results)
Construye un objeto LinearFeet a partir de los datos de Primus y del LLM.
Esta función extrae la información de los items de un booking de Primus y los
resultados del LLM para determinar si la carga es apilable (stackable) y
construir un objeto LinearFeet con los detalles de la carga.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
Un diccionario que representa un único booking de Primus. |
required |
llm_results
|
dict[str, dict]
|
Un diccionario con los resultados del procesamiento del LLM, usando el BOLNumber como clave. |
required |
Returns:
| Type | Description |
|---|---|
Optional[LinearFeet]
|
Optional[LinearFeet]: Un objeto |
Source code in app/services/mapper/mapper.py
def build_linear_feet_obj(
primus_result: dict,
llm_results: dict[str,dict]
) -> Optional[LinearFeet]:
"""
Construye un objeto LinearFeet a partir de los datos de Primus y del LLM.
Esta función extrae la información de los items de un booking de Primus y los
resultados del LLM para determinar si la carga es apilable (stackable) y
construir un objeto `LinearFeet` con los detalles de la carga.
Args:
primus_result (dict): Un diccionario que representa un único booking de Primus.
llm_results (dict[str, dict]): Un diccionario con los resultados del
procesamiento del LLM, usando el BOLNumber como clave.
Returns:
Optional[LinearFeet]: Un objeto `LinearFeet` si se encuentra una coincidencia
de BOL en los resultados del LLM, de lo contrario, retorna `None`.
"""
bol_number = str(primus_result["BOLNumber"])
# Buscar el matching en LLM
llm_entry = llm_results.get(bol_number,{})
if not llm_entry:
return None # no hay matching LLM para este BOL
# Determinar stackable:
# puede ser a nivel global (si todos los ítems son stackables)
# Ejemplo: basta que al menos uno sea stackable para marcar True
llm_line_items = llm_entry.get('parsed',{}).get("line_items",[])
stackable = any(li.get("stackable") for li in llm_line_items)
# Mapear freightInfo de Primus a LineItembyLinearFeet
freight_info = primus_result.get("freightInfo", [])
line_items = [
LineItembyLinearFeet(
lenght=item["length"],
width=item["width"],
height=item["height"],
weight=item["weight"],
hu_count=item["qty"]
)
for item in freight_info
]
return LinearFeet(
line_items=line_items,
stackable=stackable
)
get_llm_line_item(parsed_llm, idx)
Obtiene de forma segura un 'line item' de los resultados del LLM.
Intenta acceder a un 'line item' específico por su índice dentro de la lista de 'line_items' en los resultados procesados del LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parsed_llm
|
dict
|
El diccionario de resultados procesados del LLM. |
required |
idx
|
int
|
El índice del 'line item' a obtener. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
El 'line item' encontrado como un diccionario. Retorna un diccionario vacío si el índice está fuera de rango o si ocurre un error de tipo. |
Source code in app/services/mapper/mapper.py
def get_llm_line_item(parsed_llm, idx):
"""
Obtiene de forma segura un 'line item' de los resultados del LLM.
Intenta acceder a un 'line item' específico por su índice dentro de la lista
de 'line_items' en los resultados procesados del LLM.
Args:
parsed_llm (dict): El diccionario de resultados procesados del LLM.
idx (int): El índice del 'line item' a obtener.
Returns:
dict: El 'line item' encontrado como un diccionario. Retorna un
diccionario vacío si el índice está fuera de rango o si ocurre
un error de tipo.
"""
try:
return parsed_llm.get("line_items", [])[idx] or {}
except (IndexError, TypeError):
return {}
load_mapper(jsonfilename)
Carga un archivo de configuración JSON desde el directorio actual.
Esta función construye la ruta a un archivo JSON basándose en el nombre de archivo proporcionado y lo carga en un diccionario de Python.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jsonfilename
|
str
|
El nombre del archivo JSON a cargar. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Un diccionario con el contenido del archivo JSON. |
Source code in app/services/mapper/mapper.py
def load_mapper(jsonfilename:str) -> dict:
"""
Carga un archivo de configuración JSON desde el directorio actual.
Esta función construye la ruta a un archivo JSON basándose en el nombre de archivo
proporcionado y lo carga en un diccionario de Python.
Args:
jsonfilename (str): El nombre del archivo JSON a cargar.
Returns:
dict: Un diccionario con el contenido del archivo JSON.
"""
base_dir = os.path.dirname(__file__) # carpeta actual: app/services/mapper
filepath = os.path.join(base_dir, jsonfilename, )
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
map_accesorials_to_glt(all_primus_data, llm_results, mapper_data)
Mapea los 'accessorials' de Primus a los de GLT y los consolida.
Esta función procesa una lista de bookings de Primus, mapea sus 'accessorials' utilizando un diccionario de mapeo, agrega 'accessorials' adicionales basados en los resultados del LLM (ej. 'Stackable'), y elimina duplicados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
all_primus_data
|
list[dict]
|
Lista de bookings de Primus. |
required |
llm_results
|
dict[str, dict]
|
Resultados del procesamiento del LLM, usando el BOLNumber como clave. |
required |
mapper_data
|
dict
|
Diccionario para mapear los 'accessorials' de Primus a GLT. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, dict]
|
dict[str, dict]: Un diccionario donde cada clave es un BOLNumber y el valor es un diccionario que contiene una lista de 'accessorials' únicos para ese BOL. |
Source code in app/services/mapper/mapper.py
def map_accesorials_to_glt(all_primus_data: list[dict], llm_results:dict[str,dict], mapper_data: dict) -> dict[str, dict]:
"""
Mapea los 'accessorials' de Primus a los de GLT y los consolida.
Esta función procesa una lista de bookings de Primus, mapea sus 'accessorials'
utilizando un diccionario de mapeo, agrega 'accessorials' adicionales basados
en los resultados del LLM (ej. 'Stackable'), y elimina duplicados.
Args:
all_primus_data (list[dict]): Lista de bookings de Primus.
llm_results (dict[str, dict]): Resultados del procesamiento del LLM,
usando el BOLNumber como clave.
mapper_data (dict): Diccionario para mapear los 'accessorials' de Primus a GLT.
Returns:
dict[str, dict]: Un diccionario donde cada clave es un BOLNumber y el valor
es un diccionario que contiene una lista de 'accessorials' únicos para ese BOL.
"""
results = {}
for booking in all_primus_data:
bol_number = str(booking["BOLNumber"])
parsed_llm = llm_results.get(bol_number,{}).get("parsed", {})
accessorials_by_booking = booking["accessorials"]
accessorials = []
for accessorial in accessorials_by_booking: # list[str]
accessorials.extend(mapper_data.get(accessorial,[])) # list[dict]
for item in parsed_llm.get('line_items',[]):
if item["stackable"]:
accessorials.append({
"name": "Stackable",
"accesorial_id": "a01Rc000002Fw8z"
})
else:
accessorials.append({
"name": "Non Stackable - Fragile",
"accesorial_id": "a013s00000PKXDc"
})
unique_accessorials = list({frozenset(d.items()): d for d in accessorials}.values())
results[bol_number] = {'accessorials':unique_accessorials}
return results
map_units(primus_data, llm_results)
Convierte las unidades de peso y dimensión de los datos de Primus.
Esta función itera sobre los datos de Primus y, basándose en la información extraída por el LLM, convierte el peso de KG a LBS y las dimensiones de CM a IN cuando es necesario.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_data
|
list[dict]
|
Una lista de diccionarios, donde cada diccionario representa un booking de Primus. |
required |
llm_results
|
dict[str, dict]
|
Un diccionario con los resultados del procesamiento del LLM, usando el BOLNumber como clave. |
required |
Returns:
| Type | Description |
|---|---|
|
list[dict]: La lista de bookings de Primus con las unidades de peso y dimensión convertidas. |
Source code in app/services/mapper/mapper.py
def map_units(primus_data: list[dict], llm_results:dict[str,dict]):
"""
Convierte las unidades de peso y dimensión de los datos de Primus.
Esta función itera sobre los datos de Primus y, basándose en la información
extraída por el LLM, convierte el peso de KG a LBS y las dimensiones de CM a IN
cuando es necesario.
Args:
primus_data (list[dict]): Una lista de diccionarios, donde cada diccionario
representa un booking de Primus.
llm_results (dict[str, dict]): Un diccionario con los resultados del
procesamiento del LLM, usando el BOLNumber como clave.
Returns:
list[dict]: La lista de bookings de Primus con las unidades de peso y
dimensión convertidas.
"""
for booking in primus_data:
bol_number = str(booking["BOLNumber"])
parsed_llm = llm_results.get(bol_number,{}).get("parsed", {})
load_weight_units = parsed_llm.get("load", {}).get("weight_units", "LB")
line_item_dimension_units = any(li.get("dimension_units") == "CM" for li in parsed_llm.get("line_items", []))
if load_weight_units == "KG":
for fi in booking.get("freightInfo", []):
weight_in_lb = float(fi.get("weight", 0)) * 2.205 # Convertir KG a LBS
fi["weight"] = math.ceil(weight_in_lb)
total_weight_in_lb = float(booking.get("totalWeight", 0)) * 2.205 # Convertir KG a LBS
booking["totalWeight"] = math.ceil(total_weight_in_lb)
if line_item_dimension_units:
for fi in booking.get("freightInfo", []):
length_in = float(fi.get("length", 0)) * 0.393701 # Convertir CM a IN
width_in = float(fi.get("width", 0)) * 0.393701
height_in = float(fi.get("height", 0)) * 0.393701
fi["length"] = math.ceil(length_in)
fi["width"] = math.ceil(width_in)
fi["height"] = math.ceil(height_in)
return primus_data
CRM Client
Cliente para interactuar con la API del CRM (Salesforce). Se encarga de crear y actualizar cuentas, contactos, cargas (loads) y cotizaciones.
app.services.crm_client
app.services.crm_client
Classes
CRMClient
Cliente para interactuar con la API de CRM.
Esta clase maneja la autenticación y las solicitudes a la API de CRM para crear y actualizar cuentas, contactos, cargas y cotizaciones.
Source code in app/services/crm_client.py
class CRMClient:
"""
Cliente para interactuar con la API de CRM.
Esta clase maneja la autenticación y las solicitudes a la API de CRM
para crear y actualizar cuentas, contactos, cargas y cotizaciones.
"""
def __init__(self):
"""
Inicializa el cliente de CRM.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.crm_base_url
self.user = settings.crm_user
self.password = settings.crm_password
self.timeout = settings.http_client_timeout_seconds
self.semaphore = asyncio.Semaphore(settings.concurrency)
self.token = None
async def login(self):
"""
Autentica con el CRM y guarda el token de acceso.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor, pero almacena el token de acceso en self.token.
Levanta una excepción si la autenticación falla.
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
f"{self.base_url}/auth/access-token-v2",
data={
"grant_type": "password",
"username": self.user,
"password": self.password,
"scope": "",
"client_id": "",
"client_secret": ""
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
resp.raise_for_status()
self.token = resp.json().get("access_token")
logger.info("CRM login OK, nuevo token obtenido")
def _headers(self) -> dict:
"""
Genera los encabezados de autorización para las solicitudes al CRM.
Args:
No recibe argumentos.
Returns:
dict: Un diccionario con los encabezados de autorización y tipo de contenido.
"""
return {"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"}
async def _request_with_reauth(self, method: str, path: str, **kwargs) -> httpx.Response:
"""
Envía una solicitud HTTP al CRM, con reintento automático si el token está expirado.
Args:
method (str): El método HTTP a utilizar (GET, POST, PATCH, etc.).
path (str): La ruta del endpoint del CRM.
**kwargs: Argumentos adicionales para la solicitud httpx.request.
Returns:
httpx.Response: La respuesta de la solicitud HTTP.
Levanta una excepción si la solicitud falla después de los reintentos.
"""
if not self.token:
await self.login()
url = f"{self.base_url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.request(method, url, headers=self._headers(), **kwargs)
if resp.status_code in (401, 403):
logger.warning("Token expirado o inválido, reintentando login...")
await self.login()
resp = await client.request(method, url, headers=self._headers(), **kwargs)
resp.raise_for_status()
return resp
# -------------------------------
# Métodos usando _request_with_reauth
# -------------------------------
async def _post_resource(self, bol_number: str, role: str, resource: Accounts | Contact, url_path: str):
"""
Envía una solicitud POST para crear un recurso (Cuenta o Contacto) en el CRM.
Incluye lógica de reintento para contactos con un número de teléfono predeterminado en caso de error.
Args:
bol_number (str): El número de BOL asociado al recurso.
role (str): El rol del recurso (e.g., "shipper", "consignee", "hazmat").
resource (Accounts | Contact): El objeto Pydantic del recurso a crear.
url_path (str): La ruta del endpoint para crear el recurso (e.g., "account", "contact").
Returns:
tuple: Una tupla que contiene el número de BOL, el rol y un diccionario con el ID del recurso creado.
Si falla, retorna un diccionario con un ID vacío.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth(
"POST",
url_path,
json=resource.model_dump(),
)
return bol_number, role, {"id": resp.json().get("id")}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error creando ({role}) para BOL {bol_number}: {e} {error_detail}")
if isinstance(resource, Contact):
tp = "Contact"
else:
tp = "Account"
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error creando ({tp}-{role}) para BOL {bol_number}: {e} {error_detail}"
}
)
if isinstance(resource, Contact):
try:
logger.warning(f"Reintentando creación de Contact para BOL {bol_number} con phone 3057260275")
new_resource = resource.model_copy(update={"phone": "3057260275"})
resp = await self._request_with_reauth(
"POST",
url_path,
json=new_resource.model_dump(),
)
return bol_number, role, {"id": resp.json().get("id")}
except Exception as e2:
error_detail2 = ""
if hasattr(e2, "response") and e2.response is not None:
try:
error_json = e2.response.json()
error_detail2 = error_json.get("detail", str(error_json))
except Exception:
error_detail2 = e2.response.text
else:
error_detail2 = str(e2)
logger.exception(f"Segundo intento fallido creando Contact para BOL {bol_number}: {e2} {error_detail2}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error creando (Contact) para BOL {bol_number}: {e2} {error_detail2}"
}
)
return bol_number, role, {"id": ""}
async def create_accounts_id(self, primus_results: list):
"""
Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario que contiene los IDs de las cuentas creadas,
separados por "shipper" y "consignee", indexados por número de BOL.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
for role in ("shipper", "consignee"):
account = build_account_obj(primus_result, role)
if account:
tasks.append(self._post_resource(bol_number, role, account, "account"))
else:
logger.warning(f"No se pudo construir Account {role} para BOL {bol_number}, se omite")
results = await asyncio.gather(*tasks)
return {"shipper": {bol: acc for bol, role, acc in results if role == "shipper"},
"consignee": {bol: acc for bol, role, acc in results if role == "consignee"}}
async def create_contacts_id(self, primus_results: list, accounts_mapping: dict):
"""
Crea contactos en el CRM para los shippers, consignees y contactos hazmat.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
accounts_mapping (dict): Un diccionario con los IDs de las cuentas ya creadas.
Returns:
dict: Un diccionario que contiene los IDs de los contactos creados,
separados por "shipper", "consignee" y "hazmat", indexados por número de BOL.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
for role in ("shipper", "consignee"):
account_id = accounts_mapping[role].get(bol_number, {}).get("id")
if account_id:
contact = build_contact_obj(primus_result, role, account_id)
if contact:
tasks.append(self._post_resource(bol_number, role, contact, "contact"))
# hazmat
if evaluate_if_need_hazmat_contact(primus_result):
consignee_id = accounts_mapping["consignee"].get(bol_number, {}).get("id")
if consignee_id:
hazmat_contact = build_hazmat_contact_obj(primus_result, consignee_id)
if hazmat_contact:
tasks.append(self._post_resource(bol_number, "hazmat", hazmat_contact, "contact"))
results = await asyncio.gather(*tasks)
out = {"shipper": {}, "consignee": {}, "hazmat": {}}
for bol, role, cont in results:
out[role][bol] = cont
return out
async def _get_customer_id(self, bol_number: str, third_party: str):
"""
Obtiene el ID de un cliente (customer) del CRM basado en el third_party.
Args:
bol_number (str): El número de BOL asociado.
third_party (str): El identificador del tercer partido.
Returns:
tuple: Una tupla que contiene el número de BOL y un diccionario con el ID del cliente.
Si falla, retorna un diccionario con un ID vacío.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("GET", f"account/tai/{third_party}")
response = resp.json()
return bol_number, {"customer_id": response[0].get("id")}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error obteniendo customer_id para BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error obteniendo customer_id para BOL {bol_number}: {e} - {error_detail}"
}
)
return bol_number, {"customer_id": ""}
async def _get_mode_id(self, bol_number: str, mode: str):
"""
Obtiene el ID de un modo de transporte (mode) del CRM.
Args:
bol_number (str): El número de BOL asociado.
mode (str): El nombre del modo de transporte.
Returns:
tuple: Una tupla que contiene el número de BOL y un diccionario con el ID del modo.
Si falla, retorna un diccionario con un ID vacío.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("GET", "mode", params={"department": mode})
response = resp.json()
return bol_number, {"mode_id": next((item["id"] for item in response if item["name"] == mode), "")}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error obteniendo mode_id para BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error obteniendo mode_id para BOL {bol_number}: {e} - {error_detail}"
}
)
return bol_number, {"mode_id": ""}
async def get_customer_and_mode_id(self, primus_results: list):
"""
Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
que contiene "customer_id" y/o "mode_id".
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
third_party = primus_result.get("thirdParty", {}).get("id")
mode = primus_result.get("vendor", {}).get("mode")
if not third_party and not mode:
logger.warning(f"No se encontró thirdParty ni mode en BOL {bol_number}, se omite")
continue
if third_party:
tasks.append(self._get_customer_id(bol_number, third_party))
if mode:
tasks.append(self._get_mode_id(bol_number, mode))
results = await asyncio.gather(*tasks)
output: dict[str, dict[str, str | None]] = {}
for bol_number, result in results:
if bol_number not in output:
output[bol_number] = {}
output[bol_number].update(result)
return output
async def _post_load(self, bol_number: str, final_load: FinalLoad):
"""
Crea una carga (load) en el CRM.
Args:
bol_number (str): El número de BOL asociado a la carga.
final_load (FinalLoad): El objeto Pydantic con los datos finales de la carga.
Returns:
tuple: Una tupla que contiene el número de BOL y un diccionario con el ID de la carga creada.
Si falla, retorna un diccionario con un ID None.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth(
"POST",
"load/v2",
json=final_load.model_dump()
)
return bol_number, {"id": resp.json().get("id")}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error creando load para BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error creando load para BOL {bol_number}: {e} - {error_detail}"
}
)
return bol_number, {"id": None}
async def create_loads(self, final_loads: list[FinalLoad]):
"""
Crea múltiples cargas en el CRM.
Args:
final_loads (list[FinalLoad]): Una lista de objetos Pydantic con los datos finales de las cargas.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con el ID de la carga creada.
"""
tasks = [self._post_load(fl.bol_number, fl) for fl in final_loads]
results = await asyncio.gather(*tasks)
return {bol: load_out for bol, load_out in results}
async def _post_carrier_quote(self, bol_number: str, carrier_quote: CarrierQuote):
"""
Crea una cotización de transportista (carrier quote) en el CRM.
Args:
bol_number (str): El número de BOL asociado a la cotización.
carrier_quote (CarrierQuote): El objeto Pydantic con los datos de la cotización del transportista.
Returns:
No retorna ningún valor. Envía un correo electrónico si falla la creación.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("POST", "carrier-quote", json=carrier_quote.model_dump())
logger.info(f"Carrier quote creado para BOL {bol_number}: {resp.json()}")
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error creando Carrier quote para BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error creando Carrier quote para BOL {bol_number}: {e} - {error_detail}"
}
)
async def _post_customer_quote(self, bol_number: str, customer_quote: CustomerQuote):
"""
Crea una cotización de cliente (customer quote) en el CRM.
Args:
bol_number (str): El número de BOL asociado a la cotización.
customer_quote (CustomerQuote): El objeto Pydantic con los datos de la cotización del cliente.
Returns:
No retorna ningún valor. Envía un correo electrónico si falla la creación.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("POST", "customer-quote", json=customer_quote.model_dump())
logger.info(f"Customer quote creado para BOL {bol_number}: {resp.json()}")
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error creando Customer quote para BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error creando Customer quote para BOL {bol_number}: {e} - {error_detail}"
}
)
async def create_quotes(self, primus_results, llm_results, loads_mapping, carrier_services):
"""
Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
llm_results (dict): Un diccionario con los resultados del LLM, indexados por número de BOL.
loads_mapping (dict): Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL.
carrier_services (dict): Un diccionario con los servicios de transportista, indexados por número de BOL.
Returns:
No retorna ningún valor.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
load_id = loads_mapping.get(bol_number, {}).get("id")
if not load_id:
continue
llm_entry = llm_results.get(bol_number, {})
carrier_service = carrier_services.get(bol_number, {})
carrier_quote = build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)
customer_quote = build_customer_quote_object(primus_result, load_id)
tasks.append(self._post_carrier_quote(bol_number, carrier_quote))
tasks.append(self._post_customer_quote(bol_number, customer_quote))
await asyncio.gather(*tasks)
logger.info("Todos los CarrierQuotes y CustomerQuotes fueron procesados.")
async def _get_carrier_service_by_scacs(self, bol_number: str, scac: str):
"""
Obtiene el servicio de transportista (carrier service) del CRM por su código SCAC.
Args:
bol_number (str): El número de BOL asociado.
scac (str): El código SCAC del transportista.
Returns:
tuple: Una tupla que contiene el número de BOL y un diccionario con la información del servicio de transportista.
Si falla, retorna un diccionario vacío.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("GET", "carrier-service/primus/scacs", params={"scacs": [scac]})
carriers = resp.json()
if scac == "FDEG":
carrier_service = next((item for item in carriers if item["name"] == "FEDEX PACKAGING"), {})
else:
carrier_service = next((item for item in carriers if item["name"].endswith("(LTL)") ), None)
if not carrier_service:
project44_carriers = [item for item in carriers if item.get("api_provider") == "Project44"]
if len(project44_carriers) == 1:
carrier_service = project44_carriers[0]
elif len(project44_carriers) > 1:
carrier_service = next(
(item for item in project44_carriers if "LTL" in item["name"]),
project44_carriers[0]
)
else:
carrier_service = {}
return bol_number, carrier_service
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error consultando carrier SCAC={scac}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error consultando carrier SCAC={scac}: {e} - {error_detail}"
}
)
return bol_number, {}
async def carrier_service(self, primus_results: list[dict]):
"""
Obtiene los servicios de transportista para múltiples resultados de Primus.
Args:
primus_results (list[dict]): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con la información del servicio de transportista.
"""
results = await asyncio.gather(
*[self._get_carrier_service_by_scacs(str(p["BOLNumber"]),
SCACS_MAPPING.get(p.get("vendor",{}).get("SCAC"),
p.get("vendor",{}).get("SCAC")))
for p in primus_results]
)
return {bol: cs for bol, cs in results}
async def _get_account_by_name(self, bol_number: str, account_name: str):
"""
Obtiene el ID de una cuenta del CRM por su nombre.
Args:
bol_number (str): El número de BOL asociado.
account_name (str): El nombre de la cuenta a buscar.
Returns:
tuple: Una tupla que contiene el número de BOL y un diccionario con el ID de la cuenta.
Si falla, retorna un diccionario con un ID None.
"""
async with self.semaphore:
try:
resp = await self._request_with_reauth("GET", "account/name",
params={"account_name": account_name, "billing_account": True})
id = next((item["id"] for item in resp.json() if item["name"].lower() == account_name.lower()), None)
return bol_number, {"id": id}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error consultando account={account_name}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error consultando account={account_name}: {e} - {error_detail}"
}
)
return bol_number, {"id": None}
async def create_carrier_bill_id(self, llm_results: dict[str,dict]):
"""
Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.
Args:
llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con el ID de la cuenta de facturación del transportista.
"""
results = []
for bol_number, bol_data in llm_results.items():
bill_to_third_party = bol_data.get("parsed",{}).get("load",{}).get("bill_to_third_party")
logger.info(f"BOL {bol_number}, bill_to_third_party: {bill_to_third_party}")
if bill_to_third_party:
results.append(await self._get_account_by_name(bol_number, bill_to_third_party))
return {bol: out for bol, out in results}
async def _patch_load(self, bol_number: str, load_id: str, updates_data: dict):
"""
Actualiza una carga (load) existente en el CRM con los datos proporcionados.
Args:
bol_number (str): El número de BOL asociado a la carga.
load_id (str): El ID de la carga a actualizar.
updates_data (dict): Un diccionario con los datos a actualizar en la carga.
Returns:
No retorna ningún valor. Envía un correo electrónico si falla la actualización.
"""
payload = {
"billing_reference": bol_number,
"po_number": updates_data.get("po_number"),
"pro_number": updates_data.get("pro_number"),
"carrier_bill_id": updates_data.get("carrier_bill_id"),
"carrier_id": updates_data.get("carrier_id"),
"document_options_id": updates_data.get("document_options_id"),
"external_tms_integration": updates_data.get("external_tms_integration")
}
if "mode_id" in updates_data:
payload["mode_id"] = updates_data["mode_id"]
try:
await self._request_with_reauth("PATCH", f"load/{load_id}", json=payload)
logger.info(f"[OK] actualización exitosa para BOL {bol_number} (load_id={load_id})")
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"[ERROR] Falló actualización de datos para BOL {bol_number} (load_id={load_id}): {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"[ERROR] Falló actualización de datos para BOL {bol_number} (load_id={load_id}): {e} - {error_detail}"
}
)
async def update_loads(self, loads_id: dict, updates_data: dict):
"""
Actualiza múltiples cargas en el CRM.
Args:
loads_id (dict): Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL.
updates_data (dict): Un diccionario con los datos de actualización para cada carga, indexados por número de BOL.
Returns:
No retorna ningún valor.
"""
tasks = []
for bol_number, load_data in loads_id.items():
load_id = load_data.get("id")
data = updates_data.get(bol_number, {})
if load_id and data:
tasks.append(self._patch_load(bol_number, load_id, data))
await asyncio.gather(*tasks)
logger.info("Actualización de loads finalizada.")
Functions
__init__()
Inicializa el cliente de CRM.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/crm_client.py
def __init__(self):
"""
Inicializa el cliente de CRM.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.crm_base_url
self.user = settings.crm_user
self.password = settings.crm_password
self.timeout = settings.http_client_timeout_seconds
self.semaphore = asyncio.Semaphore(settings.concurrency)
self.token = None
carrier_service(primus_results)
async
Obtiene los servicios de transportista para múltiples resultados de Primus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list[dict]
|
Una lista de diccionarios con los resultados de Primus. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario donde la clave es el número de BOL y el valor es un diccionario con la información del servicio de transportista. |
Source code in app/services/crm_client.py
async def carrier_service(self, primus_results: list[dict]):
"""
Obtiene los servicios de transportista para múltiples resultados de Primus.
Args:
primus_results (list[dict]): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con la información del servicio de transportista.
"""
results = await asyncio.gather(
*[self._get_carrier_service_by_scacs(str(p["BOLNumber"]),
SCACS_MAPPING.get(p.get("vendor",{}).get("SCAC"),
p.get("vendor",{}).get("SCAC")))
for p in primus_results]
)
return {bol: cs for bol, cs in results}
create_accounts_id(primus_results)
async
Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list
|
Una lista de diccionarios con los resultados de Primus. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario que contiene los IDs de las cuentas creadas, separados por "shipper" y "consignee", indexados por número de BOL. |
Source code in app/services/crm_client.py
async def create_accounts_id(self, primus_results: list):
"""
Crea cuentas en el CRM para los shippers y consignees basados en los resultados de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario que contiene los IDs de las cuentas creadas,
separados por "shipper" y "consignee", indexados por número de BOL.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
for role in ("shipper", "consignee"):
account = build_account_obj(primus_result, role)
if account:
tasks.append(self._post_resource(bol_number, role, account, "account"))
else:
logger.warning(f"No se pudo construir Account {role} para BOL {bol_number}, se omite")
results = await asyncio.gather(*tasks)
return {"shipper": {bol: acc for bol, role, acc in results if role == "shipper"},
"consignee": {bol: acc for bol, role, acc in results if role == "consignee"}}
create_carrier_bill_id(llm_results)
async
Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm_results
|
dict[str, dict]
|
Un diccionario con los resultados del LLM, indexados por número de BOL. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario donde la clave es el número de BOL y el valor es un diccionario con el ID de la cuenta de facturación del transportista. |
Source code in app/services/crm_client.py
async def create_carrier_bill_id(self, llm_results: dict[str,dict]):
"""
Crea los IDs de facturación del transportista (carrier bill ID) basándose en los resultados del LLM.
Args:
llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con el ID de la cuenta de facturación del transportista.
"""
results = []
for bol_number, bol_data in llm_results.items():
bill_to_third_party = bol_data.get("parsed",{}).get("load",{}).get("bill_to_third_party")
logger.info(f"BOL {bol_number}, bill_to_third_party: {bill_to_third_party}")
if bill_to_third_party:
results.append(await self._get_account_by_name(bol_number, bill_to_third_party))
return {bol: out for bol, out in results}
create_contacts_id(primus_results, accounts_mapping)
async
Crea contactos en el CRM para los shippers, consignees y contactos hazmat.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list
|
Una lista de diccionarios con los resultados de Primus. |
required |
accounts_mapping
|
dict
|
Un diccionario con los IDs de las cuentas ya creadas. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario que contiene los IDs de los contactos creados, separados por "shipper", "consignee" y "hazmat", indexados por número de BOL. |
Source code in app/services/crm_client.py
async def create_contacts_id(self, primus_results: list, accounts_mapping: dict):
"""
Crea contactos en el CRM para los shippers, consignees y contactos hazmat.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
accounts_mapping (dict): Un diccionario con los IDs de las cuentas ya creadas.
Returns:
dict: Un diccionario que contiene los IDs de los contactos creados,
separados por "shipper", "consignee" y "hazmat", indexados por número de BOL.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
for role in ("shipper", "consignee"):
account_id = accounts_mapping[role].get(bol_number, {}).get("id")
if account_id:
contact = build_contact_obj(primus_result, role, account_id)
if contact:
tasks.append(self._post_resource(bol_number, role, contact, "contact"))
# hazmat
if evaluate_if_need_hazmat_contact(primus_result):
consignee_id = accounts_mapping["consignee"].get(bol_number, {}).get("id")
if consignee_id:
hazmat_contact = build_hazmat_contact_obj(primus_result, consignee_id)
if hazmat_contact:
tasks.append(self._post_resource(bol_number, "hazmat", hazmat_contact, "contact"))
results = await asyncio.gather(*tasks)
out = {"shipper": {}, "consignee": {}, "hazmat": {}}
for bol, role, cont in results:
out[role][bol] = cont
return out
create_loads(final_loads)
async
Crea múltiples cargas en el CRM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
final_loads
|
list[FinalLoad]
|
Una lista de objetos Pydantic con los datos finales de las cargas. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario donde la clave es el número de BOL y el valor es un diccionario con el ID de la carga creada. |
Source code in app/services/crm_client.py
async def create_loads(self, final_loads: list[FinalLoad]):
"""
Crea múltiples cargas en el CRM.
Args:
final_loads (list[FinalLoad]): Una lista de objetos Pydantic con los datos finales de las cargas.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
con el ID de la carga creada.
"""
tasks = [self._post_load(fl.bol_number, fl) for fl in final_loads]
results = await asyncio.gather(*tasks)
return {bol: load_out for bol, load_out in results}
create_quotes(primus_results, llm_results, loads_mapping, carrier_services)
async
Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list
|
Una lista de diccionarios con los resultados de Primus. |
required |
llm_results
|
dict
|
Un diccionario con los resultados del LLM, indexados por número de BOL. |
required |
loads_mapping
|
dict
|
Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL. |
required |
carrier_services
|
dict
|
Un diccionario con los servicios de transportista, indexados por número de BOL. |
required |
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/crm_client.py
async def create_quotes(self, primus_results, llm_results, loads_mapping, carrier_services):
"""
Crea cotizaciones de transportista y cliente para múltiples resultados de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
llm_results (dict): Un diccionario con los resultados del LLM, indexados por número de BOL.
loads_mapping (dict): Un diccionario con los IDs de las cargas ya creadas, indexados por número de BOL.
carrier_services (dict): Un diccionario con los servicios de transportista, indexados por número de BOL.
Returns:
No retorna ningún valor.
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
load_id = loads_mapping.get(bol_number, {}).get("id")
if not load_id:
continue
llm_entry = llm_results.get(bol_number, {})
carrier_service = carrier_services.get(bol_number, {})
carrier_quote = build_carrier_quote_object(primus_result, llm_entry, load_id, carrier_service)
customer_quote = build_customer_quote_object(primus_result, load_id)
tasks.append(self._post_carrier_quote(bol_number, carrier_quote))
tasks.append(self._post_customer_quote(bol_number, customer_quote))
await asyncio.gather(*tasks)
logger.info("Todos los CarrierQuotes y CustomerQuotes fueron procesados.")
get_customer_and_mode_id(primus_results)
async
Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list
|
Una lista de diccionarios con los resultados de Primus. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario donde la clave es el número de BOL y el valor es un diccionario que contiene "customer_id" y/o "mode_id". |
Source code in app/services/crm_client.py
async def get_customer_and_mode_id(self, primus_results: list):
"""
Obtiene los IDs de cliente y modo de transporte para cada resultado de Primus.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
que contiene "customer_id" y/o "mode_id".
"""
tasks = []
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
third_party = primus_result.get("thirdParty", {}).get("id")
mode = primus_result.get("vendor", {}).get("mode")
if not third_party and not mode:
logger.warning(f"No se encontró thirdParty ni mode en BOL {bol_number}, se omite")
continue
if third_party:
tasks.append(self._get_customer_id(bol_number, third_party))
if mode:
tasks.append(self._get_mode_id(bol_number, mode))
results = await asyncio.gather(*tasks)
output: dict[str, dict[str, str | None]] = {}
for bol_number, result in results:
if bol_number not in output:
output[bol_number] = {}
output[bol_number].update(result)
return output
login()
async
Autentica con el CRM y guarda el token de acceso.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor, pero almacena el token de acceso en self.token. |
|
|
Levanta una excepción si la autenticación falla. |
Source code in app/services/crm_client.py
async def login(self):
"""
Autentica con el CRM y guarda el token de acceso.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor, pero almacena el token de acceso en self.token.
Levanta una excepción si la autenticación falla.
"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(
f"{self.base_url}/auth/access-token-v2",
data={
"grant_type": "password",
"username": self.user,
"password": self.password,
"scope": "",
"client_id": "",
"client_secret": ""
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
resp.raise_for_status()
self.token = resp.json().get("access_token")
logger.info("CRM login OK, nuevo token obtenido")
update_loads(loads_id, updates_data)
async
Actualiza múltiples cargas en el CRM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
loads_id
|
dict
|
Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL. |
required |
updates_data
|
dict
|
Un diccionario con los datos de actualización para cada carga, indexados por número de BOL. |
required |
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/crm_client.py
async def update_loads(self, loads_id: dict, updates_data: dict):
"""
Actualiza múltiples cargas en el CRM.
Args:
loads_id (dict): Un diccionario con los IDs de las cargas a actualizar, indexados por número de BOL.
updates_data (dict): Un diccionario con los datos de actualización para cada carga, indexados por número de BOL.
Returns:
No retorna ningún valor.
"""
tasks = []
for bol_number, load_data in loads_id.items():
load_id = load_data.get("id")
data = updates_data.get(bol_number, {})
if load_id and data:
tasks.append(self._patch_load(bol_number, load_id, data))
await asyncio.gather(*tasks)
logger.info("Actualización de loads finalizada.")
Functions
evaluate_if_need_hazmat_contact(primus_result)
Evalúa si se necesita un contacto Hazmat basado en el contenido del primus_result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_result
|
dict
|
El resultado del procesamiento de Primus que contiene la información de la carga. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True si se encuentra algún artículo peligroso (hazmat), False en caso contrario. |
Source code in app/services/crm_client.py
def evaluate_if_need_hazmat_contact(primus_result: dict) -> bool:
"""
Evalúa si se necesita un contacto Hazmat basado en el contenido del primus_result.
Args:
primus_result (dict): El resultado del procesamiento de Primus que contiene la información de la carga.
Returns:
bool: True si se encuentra algún artículo peligroso (hazmat), False en caso contrario.
"""
lineItems = primus_result.get("freightInfo", [])
for lineItem in lineItems:
hazmat = lineItem.get("hazmat", False)
if hazmat:
return True
return False
Megatron Client
Cliente que se conecta a la API de Megatron, utilizada para realizar cálculos específicos como los pies lineales (linear feet).
app.services.megatron_client
app.services.megatron_client
Classes
MegatronClient
Cliente para interactuar con la API de Megatron.
Esta clase se encarga de calcular los pies lineales (linear feet) de los Bills of Lading (BOLs) utilizando la API de Megatron.
Source code in app/services/megatron_client.py
class MegatronClient:
"""
Cliente para interactuar con la API de Megatron.
Esta clase se encarga de calcular los pies lineales (linear feet)
de los Bills of Lading (BOLs) utilizando la API de Megatron.
"""
def __init__(self):
"""
Inicializa el cliente de Megatron.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.megatron_base_url
self.token = settings.megatron_token
async def _post_linear_feet(
self, client: httpx.AsyncClient, bol_number: str, lf_in: LinearFeet, headers: dict
) -> tuple[str, int | None]:
"""
Envía una solicitud POST a la API de Megatron para calcular los pies lineales de un solo BOL.
Args:
client (httpx.AsyncClient): Cliente HTTP asíncrono para realizar la solicitud.
bol_number (str): El número de BOL para el cual se calcularán los pies lineales.
lf_in (LinearFeet): Objeto Pydantic con los datos de entrada para el cálculo de pies lineales.
headers (dict): Diccionario de encabezados HTTP para la solicitud.
Returns:
tuple[str, dict]: Una tupla que contiene el número de BOL y un diccionario con el total de pies lineales calculados.
Si ocurre un error, retorna el número de BOL y un diccionario con 'total_linear_feet' en 0.0.
"""
try:
resp = await client.post(
f"{self.base_url}/linear-feet/calculate",
headers=headers,
json=lf_in.model_dump()
)
resp.raise_for_status()
return bol_number, {"total_linear_feet":resp.json().get("total_linear_feet", 0.0)}
except Exception as e:
error_detail = ""
if hasattr(e, "response") and e.response is not None:
try:
error_json = e.response.json()
error_detail = error_json.get("detail", str(error_json))
except Exception:
error_detail = e.response.text
else:
error_detail = str(e)
logger.exception(f"Error calculating linear feet for BOL {bol_number}: {e} - {error_detail}")
email_service.send_email(
dynamic_data={
"bol_number": bol_number,
"error_message": f"Error calculating linear feet for BOL {bol_number}: {e} - {error_detail}"
}
)
return bol_number, {"total_linear_feet": 0.0}
async def calculate_linear_feet_per_bol(self, primus_results: list, llm_results: dict[str,dict]):
"""
Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.
Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron
y devuelve los resultados.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
que contiene el total de pies lineales calculados.
"""
headers = {
"access-token": self.token,
"Content-Type": "application/json"
}
tasks = []
async with httpx.AsyncClient() as client:
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
lf_in = build_linear_feet_obj(primus_result, llm_results)
if not lf_in:
logger.warning(f"No hay data LLM para BOL {bol_number}, se omite")
continue
tasks.append(
self._post_linear_feet(client, bol_number, lf_in, headers)
)
results = await asyncio.gather(*tasks, return_exceptions=False)
return {bol: lf_out for bol, lf_out in results}
Functions
__init__()
Inicializa el cliente de Megatron.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/megatron_client.py
def __init__(self):
"""
Inicializa el cliente de Megatron.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.base_url = settings.megatron_base_url
self.token = settings.megatron_token
calculate_linear_feet_per_bol(primus_results, llm_results)
async
Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.
Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron y devuelve los resultados.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primus_results
|
list
|
Una lista de diccionarios con los resultados de Primus. |
required |
llm_results
|
dict[str, dict]
|
Un diccionario con los resultados del LLM, indexados por número de BOL. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Un diccionario donde la clave es el número de BOL y el valor es un diccionario que contiene el total de pies lineales calculados. |
Source code in app/services/megatron_client.py
async def calculate_linear_feet_per_bol(self, primus_results: list, llm_results: dict[str,dict]):
"""
Calcula los pies lineales para múltiples Bills of Lading (BOLs) en paralelo.
Para cada BOL, construye el objeto LinearFeetIn, realiza una llamada a la API de Megatron
y devuelve los resultados.
Args:
primus_results (list): Una lista de diccionarios con los resultados de Primus.
llm_results (dict[str, dict]): Un diccionario con los resultados del LLM, indexados por número de BOL.
Returns:
dict: Un diccionario donde la clave es el número de BOL y el valor es un diccionario
que contiene el total de pies lineales calculados.
"""
headers = {
"access-token": self.token,
"Content-Type": "application/json"
}
tasks = []
async with httpx.AsyncClient() as client:
for primus_result in primus_results:
bol_number = str(primus_result["BOLNumber"])
lf_in = build_linear_feet_obj(primus_result, llm_results)
if not lf_in:
logger.warning(f"No hay data LLM para BOL {bol_number}, se omite")
continue
tasks.append(
self._post_linear_feet(client, bol_number, lf_in, headers)
)
results = await asyncio.gather(*tasks, return_exceptions=False)
return {bol: lf_out for bol, lf_out in results}
Functions
Email Service
Servicio de utilidad para el envío de notificaciones por correo electrónico, principalmente en caso de errores durante el proceso.
app.services.send_email
app.services.send_email
Classes
EmailService
Servicio para el envío de correos electrónicos utilizando SendGrid.
Esta clase encapsula la lógica para enviar correos electrónicos, especialmente para notificaciones de errores, utilizando una plantilla dinámica.
Source code in app/services/send_email.py
class EmailService:
"""
Servicio para el envío de correos electrónicos utilizando SendGrid.
Esta clase encapsula la lógica para enviar correos electrónicos,
especialmente para notificaciones de errores, utilizando una plantilla dinámica.
"""
def __init__(self):
"""
Inicializa el cliente de SendGrid.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.sg_client = SendGridAPIClient(settings.sendgrid_api_key)
def send_email(
self, *, dynamic_data: dict
):
"""
Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.
Args:
dynamic_data (dict): Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo.
Debe contener al menos una clave "subject" si se desea un asunto personalizado,
de lo contrario, se usará un asunto predeterminado.
Returns:
None: Este método no retorna ningún valor explícito. Registra un error si el envío falla.
"""
sg_message = Mail(
from_email=settings.sendgrid_from,
to_emails=settings.sendgrid_to,
)
sg_message.template_id = settings.sendgrid_id_template
sg_message.dynamic_template_data = {
"subject": "Primus load creation failed",
**dynamic_data,
}
try:
self.sg_client.send(sg_message)
except Exception as e:
logger.error(f"[ERROR] enviando email de notificación: {str(e)}")
return None
Functions
__init__()
Inicializa el cliente de SendGrid.
Returns:
| Type | Description |
|---|---|
|
No retorna ningún valor. |
Source code in app/services/send_email.py
def __init__(self):
"""
Inicializa el cliente de SendGrid.
Args:
No recibe argumentos.
Returns:
No retorna ningún valor.
"""
self.sg_client = SendGridAPIClient(settings.sendgrid_api_key)
send_email(*, dynamic_data)
Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dynamic_data
|
dict
|
Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo. Debe contener al menos una clave "subject" si se desea un asunto personalizado, de lo contrario, se usará un asunto predeterminado. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
None |
Este método no retorna ningún valor explícito. Registra un error si el envío falla. |
Source code in app/services/send_email.py
def send_email(
self, *, dynamic_data: dict
):
"""
Envía un correo electrónico utilizando una plantilla dinámica de SendGrid.
Args:
dynamic_data (dict): Un diccionario con los datos dinámicos que se inyectarán en la plantilla del correo.
Debe contener al menos una clave "subject" si se desea un asunto personalizado,
de lo contrario, se usará un asunto predeterminado.
Returns:
None: Este método no retorna ningún valor explícito. Registra un error si el envío falla.
"""
sg_message = Mail(
from_email=settings.sendgrid_from,
to_emails=settings.sendgrid_to,
)
sg_message.template_id = settings.sendgrid_id_template
sg_message.dynamic_template_data = {
"subject": "Primus load creation failed",
**dynamic_data,
}
try:
self.sg_client.send(sg_message)
except Exception as e:
logger.error(f"[ERROR] enviando email de notificación: {str(e)}")
return None