Integrate Salesforce as a custom tool in an AI Agents

Do you want a simple way to integrate Salesforce into your smolagents agents? There is an simple way to achieve this. First, let’s add smolagents, python-dotenv and simple-salesforce cli, in requirements.txt file: smolagents[gradio] python-dotenv simple-salesforce Next, let’s code a simple AI Agent: import os import yaml from dotenv import load_dotenv from simple_salesforce import Salesforce from smolagents import (CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool) from Gradio_UI import GradioUI load_dotenv('.env') @tool def get_insurance_policies_by_status(status: str) -> str: """A tool that fetches the number of insurance policies from Salesforce based on their status. Args: status: The status of the insurance policies to fetch (e.g., 'Active', 'Inactive'). Returns: A string indicating the number of insurance policies with the specified status. """ try: # Load Salesforce credentials from environment variables sf_username = os.getenv("SALESFORCE_USERNAME") sf_password = os.getenv("SALESFORCE_PASSWORD") sf_token = os.getenv("SALESFORCE_SECURITY_TOKEN") # Connect to Salesforce sf = Salesforce(username=sf_username, password=sf_password, security_token=sf_token) print("Connected to Salesforce.") # Query Salesforce for active insurance policies query = f"SELECT COUNT() FROM InsurancePolicy__c WHERE Status__c = '{status}'" result = sf.query(query) # Extract the count from the query result policies_count = result['totalSize'] return f"The number of insurance policies with status '{status}' is: {policies_count}" except Exception as e: return f"Error fetching insurance policies with status '{status}': {str(e)}" model = HfApiModel( max_tokens=2096, temperature=0.5, model_id='Qwen/Qwen2.5-Coder-32B-Instruct', custom_role_conversions=None, ) with open("prompts.yaml", 'r') as stream: prompt_templates = yaml.safe_load(stream) agent = CodeAgent( model=model, tools=[get_insurance_policies_by_status], max_steps=6, verbosity_level=1, grammar=None, planning_interval=None, name=None, description=None, prompt_templates=prompt_templates ) GradioUI(agent).launch() The core of our AI Agent is the tool get_insurance_policies_by_status that get the status from the LLM model, calls Salesforce using simple_salesforce and returns the number of active/inactive policies. ...

April 7, 2025 · 2 min · Alex Popescu

AWS Lambda With Magnum and FastAPI

Do you want a simple way to deploy an python FastAPI in AWS Lambda ? There is an simple way to do this with Magnum. First, let’s add FastAPI, magnum and SAM cli, in requirements.txt file: fastapi mangum aws-sam-cli Next, let’s code a simple FastAPI sample app: from fastapi import FastAPI from mangum import Mangum app = FastAPI() @app.get("/high-scores") async def high_scores(): return {"scores": [100, 200, 300]} @app.post("/insert-score") async def insert_score(data: dict): return {"status": "success", "data": data} lambda_handler = Mangum(app, lifespan="off") if __name__ == "__main__": import uvicorn uvicorn.run("app:app", port=5000, log_level="info", reload=True) Next, let’s setup SAM with a template.yml file: ...

March 26, 2025 · 2 min · Alex Popescu

Observability with Elastic APM and Flask

In out last article here , we explained how to send Docker and application logs to ELK. Now is the time to add some observability to our app using Elastic APM. Configure Elastic APM First, we add Elastic APM to our Docker Compose file from last time, using the same version as Elastic Search. apm-server: image: docker.elastic.co/apm/apm-server:7.17.24 container_name: apm-server user: apm-server ports: - "8200:8200" volumes: - ./apm-server.docker.yml:/usr/share/apm-server/apm-server.yml:ro command: > --strict.perms=false -e -E output.elasticsearch.hosts=["http://elasticsearch:9200"] Next, run the containers using the docker compose up command and wait a few minutes. ...

December 23, 2024 · 1 min · Alex Popescu

AWS Lambda and CORS preflight response

Are you struggling with a frontend application that wants to use a backend AWS Lambda API ? Do you have the next CORS problem: Request header field content-type is not allowed by Access-Control-Allow-Headers in preflight response The solution is simple: implement HTTP OPTIONS method respond with the next access control headers : Access-Control-Allow-Origin, Access-Control-Allow-Headers, Access-Control-Allow-Methods For example in python you can do something like this: def return_200(): return { 'statusCode': 200, 'headers': { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET,HEAD,OPTIONS,POST,PUT', 'Access-Control-Allow-Headers': 'Origin, X-Requested-With, Content-Type, Accept, Authorization', 'Content-Type': 'application/json' }, 'body': json.dumps({'message': 'ok'}) } Documentation and links: Request header field content-type - https://answers.netlify.com/t/request-header-field-content-type-is-not-allowed-by-access-control-allow-headers-in-preflight-response/54410

December 9, 2024 · 1 min · Alex Popescu

Developing a Sample TODO API with Couchbase in Docker

Have you ever wanted to run Couchbase on Docker to freely build and test your app? Here’s a simple guide to help you set up Couchbase in Docker and build a sample TODO app using Python and FastAPI. Developer Cluster Setup First, let’s create a minimal docker-compose.yml file to spin up Couchbase: services: couchbase: image: couchbase:latest container_name: couchbase ports: - "8091:8091" # Couchbase Web Console - "8092:8092" # Query Service - "8093:8093" # Full Text Search - "11210:11210" # Data Service environment: COUCHBASE_ADMINISTRATOR_USERNAME: admin COUCHBASE_ADMINISTRATOR_PASSWORD: password volumes: - couchbase_data:/opt/couchbase/var - ./init_bucket.sh:/init_bucket.sh - ./start-couchbase.sh:/start-couchbase.sh # get the image entry point using docker inspect -f '{{.Config.Entrypoint}}' couchbase # for the current image is /entrypoint.sh command: ["/bin/bash", "/start-couchbase.sh"] mem_limit: 1024m # Limit memory usage to 3100 MB for full volumes: couchbase_data: To make it work, we need two shell scripts: start-couchbase.sh and init_bucket.sh. ...

October 27, 2024 · 5 min · Alex Popescu

Almost Wrong Way To Send Docker Containers logs To ELK

In this article, we’ll walk through setting up a Docker-based ELK (Elasticsearch, Logstash, and Kibana) stack to collect, view, and send Docker logs. services: elasticsearch: image: elasticsearch:7.17.24 environment: - discovery.type=single-node volumes: - ./elasticsearch_data/:/usr/share/elasticsearch/data mem_limit: "1g" redis-cache: image: redis:7.4.0 logstash-agent: image: logstash:7.17.24 volumes: - ./logstash-agent:/etc/logstash command: logstash -f /etc/logstash/logstash.conf depends_on: - elasticsearch ports: - 12201:12201/udp logstash-central: image: logstash:7.17.24 volumes: - ./logstash-central:/etc/logstash command: logstash -f /etc/logstash/logstash.conf depends_on: - elasticsearch kibana: image: kibana:7.17.24 ports: - 5601:5601 environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 depends_on: - elasticsearch ElasticSearch Just create a folder named elasticsearch_data for storing data. ...

October 16, 2024 · 3 min · Alex Popescu

Linux Login Update Notifier Script With Python

Introduction Do you ever want to get a notification with the number of updates required when logging into XFCE in debian/ubuntu like systems ? There is a simple python script that can do just that. It uses notify-send command and paired with the magic command that returns the update count, we get the next python script: #!/usr/bin/env python3 import os import subprocess # Run the apt-get command and grep the output command = 'apt-get --simulate upgrade | grep "upgraded.*newly installed"' output = subprocess.getoutput(command) # If there's output, send it as a notification, otherwise send a default message if output: os.system(f'notify-send "Upgrade Check" "{output}"') else: os.system('notify-send "Upgrade Check" "No upgrades available or no packages to be installed."') # Check if the file /var/run/reboot-required exists reboot_file = '/var/run/reboot-required' if os.path.exists(reboot_file): os.system('notify-send "System Update" "Reboot is required to complete updates."') All you need is to make it executable: ...

August 29, 2024 · 2 min · Alex Popescu

Automated Fake Database Population with Python

Introduction In this article, we’ll explore a Python script that leverages mimesis library to populate an (Azure) SQL database with fake data. The Code import logging import random import pandas as pd import pymssql import sqlalchemy from dotenv import dotenv_values from mimesis import Address, Datetime, Person from mimesis.enums import Gender from sqlalchemy import create_engine # Load environment variables config = dotenv_values(".env") # Configure logging to both console and file logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") rootLogger = logging.getLogger() rootLogger.setLevel(logging.INFO) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) rootLogger.addHandler(consoleHandler) def create_rows_mimesis(num=1): gender = random.choice([Gender.FEMALE, Gender.MALE]) output = [{"first_name": person.first_name(gender), "last_name": person.last_name(gender), "address": address.address(), "email": person.email(), "city": address.city(), "state": address.state(), "date_time": datetime.datetime(), "randomdata": random.randint(1000, 2000) } for x in range(num)] return output try: # Create SQLAlchemy engine engine = create_engine(config["CONNECTION_STRING"]) # Connect to the database with engine.connect() as conn: logging.info(f"Connected to database: {engine}") # Initialize mimesis objects person = Person('en') address = Address() datetime = Datetime() num_rows = int(config["ROWS"]) rows_per_batch=int(config["INSERT_LIMIT"]) logging.info(f"Generating {num_rows} rows") if num_rows > rows_per_batch: for i in range(0, num_rows, rows_per_batch): batch_df = pd.DataFrame(create_rows_mimesis(min(1000, num_rows - i))) batch_df.to_sql(config["TABLE_NAME"], engine, method='multi', index=False, if_exists='append') logging.info(f"Inserted {min(1000, num_rows - i)} rows into table: {config['TABLE_NAME']}") else: df = pd.DataFrame(create_rows_mimesis(num_rows)) df.to_sql(config["TABLE_NAME"], engine, method='multi', index=False, if_exists='replace') logging.info(f"Inserted {num_rows} rows into table: {config['TABLE_NAME']}") conn.commit() except Exception as e: logging.error(f"An error occurred: {str(e)}") logging.info("Database connection closed.") The packages needed are: ...

March 17, 2024 · 3 min · Alex Popescu