Peng Qian, Author at Towards Data Science https://towardsdatascience.com/author/qtalen/ The world’s leading publication for data science, AI, and ML professionals. Mon, 03 Feb 2025 12:21:27 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 https://towardsdatascience.com/wp-content/uploads/2025/02/cropped-Favicon-32x32.png Peng Qian, Author at Towards Data Science https://towardsdatascience.com/author/qtalen/ 32 32 Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm https://towardsdatascience.com/using-llamaindex-workflow-to-implement-an-agent-handoff-feature-like-openai-swarm-9a63420c8540/ Sat, 01 Feb 2025 12:01:25 +0000 https://towardsdatascience.com/using-llamaindex-workflow-to-implement-an-agent-handoff-feature-like-openai-swarm-9a63420c8540/ Example: a customer service chatbot project

The post Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm appeared first on Towards Data Science.

]]>
Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm. Image by DALL-E-3
Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm. Image by DALL-E-3

Happy Lunar New Year, my friends!

In the last article, I introduced the Workflow framework of Llamaindex.

Deep Dive into LlamaIndex Workflow: Event-Driven LLM Architecture

Today, I will show you how to use LlamaIndex Workflow to implement a multi-agent orchestration feature similar to OpenAI Swarm, using a customer service chatbot project as an example.


Introduction

Remember the Swarm framework released by OpenAI not long ago? Its biggest feature is agents and handoffs.

The agents are straightforward: they use a set of specific commands and tools to get tasks done. It’s like putting a LLM function call into a neat package.

And handoffs are different. They allow an agent to pass the work to another agent seamlessly based on the context of the current conversation, making agents work together without any hiccups.

Why this is important

Let’s look at a diagram explaining the whole process of a ReactAgent.

The ReactAgent needs at least three accesses to LLM to complete. Image by Author
The ReactAgent needs at least three accesses to LLM to complete. Image by Author

Just a simple agent call, like one, two, three, needs at least three accesses to LLM to complete.

Traditional agent applications are like this, keeping conversation context and user state, and the agent call chain is usually fixed. For each user request, agents have to call LLM multiple times to check the state, and honestly, some calls are unnecessary.

Here’s an example: imagine we have an e-commerce website, and we need a customer service team to answer users’ questions.

In an agent chain, agents are invoked sequentially. Image by Author
In an agent chain, agents are invoked sequentially. Image by Author

In a chain agent application, every question from a user goes to the front desk, and then the front desk asks for the pre-sales service. If they can’t answer, the front desk asks for after-sales service, and then the front desk reorganizes the answers from the backend and replies to the customer.

Isn’t that silly? Look at all the unnecessary delays and call costs it causes!

How Swarm does it

Swarm uses a handoff approach that fits the real world better. Let me use that customer service example again:

Agent handoff allows you to interact directly with the corresponding customer service. Image by Author
Agent handoff allows you to interact directly with the corresponding customer service. Image by Author

Imagine a store called Swarm. When a customer asks the front desk a question, the front desk figures out what kind of question it is (pre-sale or after-sale) and passes the customer to the corresponding service. Then, the customer talks to that service directly.

Imagine a store called Swarm. When a customer asks the front desk a question, the front desk figures out what kind of question it is (pre-sale or after-sale) and passes the customer to the corresponding service. Then, the customer talks to that service directly.

Sounds reasonable, right? So why don’t we just use Swarm?

Why not just use Swarm

Because Swarm is still just an experimental framework. According to the official statement:

Swarm is currently an experimental sample framework intended to explore ergonomic interfaces for multi-agent systems. It is not intended to be used in production and therefore has no official support. (This also means we will not be reviewing PRs or issues!)

So, we can’t use Swarm directly in production systems.

But what we need is the agent handoff capability, right? Since that’s the case, why not build a similar framework yourself?

Today’s article is written for this purpose. We will develop a project using a customer service system as an example, which will use Workflow to implement agent orchestration and handoff capabilities. Let’s get started.


Project in Practice: A Customer Service Chatbot with Agent Handoff Capability

This project is quite complex. To help you understand my implementation, I have put the entire project code at the end of the article. You can freely read and modify it without my permission.

Want to know more about my work in LLM applications or the field of data science? Feel free to subscribe to my personal blog, everything is free!

Step one, set up an interactive interface

Whether you use an agent or not, you always need to adjust your prompts and code logic. At this point, a what-you-see-is-what-you-get chat UI becomes very important.

In this section, I’ll use chainlit to quickly implement a super cool web-based chat window.

Chainlit is a Python library built on Streamlit. This means you don’t need any frontend skills to quickly build a Chatbot prototype. (Hooray)

Let’s get moving.

The scaffold of our project. Image by Author
The scaffold of our project. Image by Author

First, we create a .env file in the project’s root directory, which stores important environmental variables like OPENAI_API_KEY and OPENAI_BASE_URL. Later, I will use dotenv to read it.

This is important because by using the .env file, you can strip the API_KEY from your code, then you can freely publish your code.

Next, we need to set up a simple project scaffold. Our project will contain two folders: src and data. Our Python source code files will be placed in the src folder, while text source files for RAG use will be placed in the data folder.

In the src directory, first create an app.py file, which will act as the view to launch the chainlit interface. This file consists of three parts:

  1. Code to prepare the Workflow program.
  2. Code to respond to the user lifecycle, outputting intermediate processes.
  3. Actual code to call the Workflow agent and conduct the conversation.

The code flowchart is shown below:

Flowchart of the project UI interface. Image by Author
Flowchart of the project UI interface. Image by Author

As a production-ready system, we often need to connect to the enterprise-private deployment of large model ports. How to connect to a private LLM can be referred to in this article.

How to Connect LlamaIndex with Private LLM API Deployments

To make our customer service less rigid, we can set the temperature a bit higher. Here is the code for initializing the system environment, I will talk about the implementation of CustomerService later:

llm = OpenAILike(
    model="qwen-max-latest",
    is_chat_model=True,
    is_function_calling_model=True,
    temperature=0.35
)
Settings.llm = llm

Imagine, when the next customer service takes over to answer your question, what will she do first? Right, she needs to check the conversation history first.

So we need to create a unique, conversation-context and user-state-preserving workflow for each distinguished user in the user session:

GREETINGS = "Hello, what can I do for you?"

def ready_my_workflow() -> CustomerService:
    memory = ChatMemoryBuffer(
        llm=llm,
        token_limit=5000
    )

    agent = CustomerService(
        memory=memory,
        timeout=None,
        user_state=initialize_user_state()
    )
    return agent

def initialize_user_state() -> dict[str, str | None]:
    return {
        "name": None
    }

@cl.on_chat_start
async def start():
    workflow = ready_my_workflow()
    cl.user_session.set("workflow", workflow)

    await cl.Message(
        author="assistant", content=GREETINGS
    ).send()

At the same time, I will also use chainlit’s cl.step decorator to implement a simple logging method, which can help us output some process logs on the page, letting users know where we are now:

@cl.step(type="run", show_input=False)
async def on_progress(message: str):
    return message

Then there is the main method, which is called every round of conversation.

@cl.on_message
async def main(message: cl.Message):
    workflow: CustomerService = cl.user_session.get("workflow")
    context = cl.user_session.get("context")
    msg = cl.Message(content="", author="assistant")
    user_msg = message.content
    handler = workflow.run(
        msg=user_msg,
        ctx=context
    )
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            await on_progress(event.msg)

    await msg.send()
    result = await handler
    msg.content = result
    await msg.update()
    cl.user_session.set("context", handler.ctx)

In this method, we first get the user-inputted dialogue, then call the workflow’s run method to start the agent routing, while iterating through the events in the workflow pipeline and calling on_progress to output to the page. Finally, we output the result of the dialogue on the page and update the Context.

To match the construction of the chainlit interface, we can first write a simple workflow:

class CustomerService(Workflow):
    def __init__(
            self,
            llm: OpenAILike | None = None,
            memory: ChatMemoryBuffer = None,
            user_state: dict[str, str | None] = None,
            *args,
            **kwargs
    ):
        self.llm = llm or Settings.llm
        self.memory = memory or ChatMemoryBuffer()
        self.user_state = user_state
        super().__init__(*args, **kwargs)

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="We're making some progress."))
        return StopEvent(result="Hello World")

Tada, our interactive interface is out:

Our UI interface for this project. Image by Author
Our UI interface for this project. Image by Author

Next, we can start preparing the ingredients for today, and text source files for RAG use.

Step two, generate text files

Since this project is about simulating a customer support team for an online drone e-commerce website, I plan to set the background to an online unmanned aerial vehicle e-commerce site.

I need two files: one file to introduce the drones being sold in the store and their details. Another file contains common FAQs about drone use and after-sales terms.

To avoid business and data licensing issues, I plan to use LLM to generate the text I want. I specifically instructed LLM not to include any brands or real product information.

Here is a screenshot of my file generation:

Screenshot of data file generated using LLM. Image by Author
Screenshot of data file generated using LLM. Image by Author

You can take my prompt as a reference:

SKUS_TEMPLATE_EN = """
    You are the owner of an online drone store, please generate a description in English of all the drones for sale.
    Include the drone model number, selling price, detailed specifications, and a detailed description in more than 400 words.
    Do not include brand names.
    No less than 20 types of drones, ranging from consumer to industrial use.
"""
TERMS_TEMPLATE_EN = """
    You are the head of a brand's back office department, and you are asked to generate a standardized response to after-sales FAQs in English that is greater than 25,000 words.
    The text should include common usage questions, as well as questions related to returns and repairs after the sale.
    This text will be used as a reference for the customer service team when answering customer questions about after-sales issues.
    Only the body text is generated, no preamble or explanation is added.
"""

Step three, handle indexing and retrieve privatized data

The foundation LLM does not contain corporate internal data. For enterprise applications, it is inevitable to use RAG to allow the LLM to access corporate privatized data.

Our drone store is no exception. Before letting the agent staff start work, we need to provide them with some tools to access the product catalog and after-sales policy.

LlamaIndex provides many indexes suitable for different occasions. If used in a real system, I would prefer to use KnowledgeGraphIndex for product information text.

However, to make the sample project easy to understand, I still choose to use chromadb and VectorStoreIndex:

def get_index(collection_name: str,
              files: list[str]) -> VectorStoreIndex:
    chroma_client = chromadb.PersistentClient(path="temp/.chroma")

    collection = chroma_client.get_or_create_collection(collection_name)
    vector_store = ChromaVectorStore(chroma_collection=collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)

    ready = collection.count()
    if ready > 0:
        print("File already loaded")
        index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
    else:
        print("File not loaded.")
        docs = SimpleDirectoryReader(input_files=files).load_data()
        index = VectorStoreIndex.from_documents(
            docs, storage_context=storage_context, embed_model=embed_model,
            transformer=[SentenceSplitter(chunk_size=512, chunk_overlap=20)]
        )

    return index

INDEXES = {
    "SKUS": get_index("skus_docs", ["data/skus_en.txt"]),
    "TERMS": get_index("terms_docs", ["data/terms_en.txt"])
}

The running flowchart of this code is as follows:

The running flowchart of the code. Image by Author
The running flowchart of the code. Image by Author

If vector data already exists, return the index directly. If the data has not been loaded yet, first load the data into the vector store, then return the index.

Then we add a tool method to help the agent get the corresponding retriever:

async def query_docs(
        index: VectorStoreIndex, query: str,
        similarity_top_k: int = 1
) -> str:
    retriever = index.as_retriever(similarity_top_k=similarity_top_k)
    nodes = await retriever.aretrieve(query)
    result = ""
    for node in nodes:
        result += node.get_content() + "nn"
    return result

Step four, hire a few agents

Since we are building a smart customer service project, it is necessary to hire a few customer service agents.

We also need to set up a data class for agents, which needs to include instructions and a set of tools, just like Swarm agents. Here we use AgentConfig to constrain, which is inherited from Pydantic’s BaseModel.

class AgentConfig(BaseModel):
    """
    Detailed configuration for an agent
    """
    model_config = ConfigDict(arbitrary_types_allowed=True)
    name: str = Field(description="agent name")
    description: str = Field(
        description="agent description, which describes what the agent does"
    )
    system_prompt: str | None = None
    tools: list[BaseTool] | None = Field(
        description="function tools available for this agent"
    )

We need to hire a lobby manager agent, this agent needs to mark which agent the user will be handed over to.

class TransferToAgent(BaseModel):
    """Used to explain which agent to transfer to next."""
    agent_name: str = Field(description="The name of the agent to transfer to.")

We also need to design a request transfer agent, which uses this agent to notify the workflow when a certain customer service cannot answer the user’s question.

class RequestTransfer(BaseModel):
    """
    Used to indicate that you don't have the necessary permission to complete the user's request,
    or that you've already completed the user's request and want to transfer to another agent.
    """
    pass

Then we need to prepare a few tools for the agents to use:

First is a login tool, this tool is only used to register the user’s name. If you need to handle the user’s login action, you can implement the details in this method. I use a closure to return a tool list.

def get_authentication_tools() -> list[BaseTool]:
    async def login(ctx: Context, username: str) -> bool:
        """When the user provides their name, you can use this method to update their status.。
        :param username The user's title or name.
        """
        if not username:
            return False

        user_state = await ctx.get("user_state", None)
        user_state["name"] = username.strip()
        await ctx.set("user_state", user_state)
        return True

    return [FunctionToolWithContext.from_defaults(async_fn=login)]

Here is a detail, since the tool needs to handle the user state saved in the workflow Context, we need to access the ctx object. But when the tool is called by the agent, the agent cannot sense the ctx object, so we need to let the agent ignore it.

Here I modified the behavior of the FunctionTool module of LlamaIndex and rewrote the FunctionToolWithContext module. To save time, I referred to an example on the official website, you can find it here. Of course, you can also find the source code at the end of the project code in the article.

We also need tools to get the product catalog and after-sales terms, these two tools are direct calls to the retriever, quite simple.

def get_pre_sales_tools() -> list[BaseTool]:
    async def skus_info_retrieve(ctx: Context, query: str) -> str:
        """
        When the user asks about a product, you can use this tool to look it up.
        :param query: The user's request.
        :return: The information found.
        """
        sku_info = await query_docs(INDEXES["SKUS"], query)
        return sku_info

    return [FunctionToolWithContext.from_defaults(async_fn=skus_info_retrieve)]

def get_after_sales_tools() -> list[BaseTool]:
    async def terms_info_retrieve(ctx: Context, query: str) -> str:
        """
        When the user asks about how to use a product, or about after-sales and repair options, you can use this tool to look it up.
        :param query: The user's request.
        :return: The information found.
        """
        terms_info = await query_docs(INDEXES["TERMS"], query)
        return terms_info
    return [FunctionToolWithContext.from_defaults(async_fn=terms_info_retrieve)]

To sum up, we need three professional customer service agents:

  1. The first one is the front desk, used to register the user’s visit.
  2. The second one is the pre-sales service, used to recommend various products to users.
  3. The third one is the after-sales service, used to answer various usage questions and after-sales terms.
def _get_agent_configs() -> list[AgentConfig]:
    return [
        AgentConfig(
            name="Authentication Agent",
            description="Record the user's name. If there's no name, you need to ask this from the customer.",
            system_prompt="""
            You are a front desk customer service agent for registration.
            If the user hasn't provided their name, you need to ask them.
            When the user has other requests, transfer the user's request.
            """,
            tools=get_authentication_tools()
        ),
        AgentConfig(
            name="Pre Sales Agent",
            description="When the user asks about product information, you need to consult this customer service agent.",
            system_prompt="""
            You are a customer service agent answering pre-sales questions for customers.
            You will respond to users' inquiries based on the context of the conversation.

            When the context is not enough, you will use tools to supplement the information.
            You can only handle user inquiries related to product pre-sales. 
            Please use the RequestTransfer tool to transfer other user requests.
            """,
            tools=get_pre_sales_tools()
        ),
        AgentConfig(
            name="After Sales Agent",
            description="When the user asks about after-sales information, you need to consult this customer service agent.",
            system_prompt="""
            You are a customer service agent answering after-sales questions for customers, including how to use the product, return and exchange policies, and repair solutions.
            You respond to users' inquiries based on the context of the conversation.
            When the context is not enough, you will use tools to supplement the information.
            You can only handle user inquiries related to product after-sales. 
            Please use the RequestTransfer tool to transfer other user requests.
            """,
            tools=get_after_sales_tools()
        )
    ]

According to the needs of the workflow, we also need to write two methods to register agents to the workflow.

def get_agent_config_pair() -> dict[str, AgentConfig]:
    agent_configs = _get_agent_configs()
    return {agent.name: agent for agent in agent_configs}

def get_agent_configs_str() -> str:
    agent_configs = _get_agent_configs()
    pair_list = [f"{agent.name}: {agent.description}" for agent in agent_configs]
    return "n".join(pair_list)

Finally, we also need a workflow system_prompt for orchestration, which will contain all agent information and user status, and hand over the user to the correct customer service agent when needed.

This prompt will be used directly by the workflow, no separate agent is needed, so just put the prompt here:

ORCHESTRATION_PROMPT = """  
    You are a customer service manager for a drone store.
    Based on the user's current status, latest request, and the available customer service agents, you help the user decide which agent to consult next.

    You don't focus on the dependencies between agents; the agents will handle those themselves.
    If the user asks about something unrelated to drones, you should politely and briefly decline to answer.

    Here is the list of available customer service agents:
    {agent_configs_str}

    Here is the user's current status:
    {user_state_str}
"""

Step five, build the core workflow

After so much preparation, we can finally get to the main course, and I’m sure everyone is eager to get started haha.

Since the workflow is an event-driven framework, we need to define several events like before:

class OrchestrationEvent(Event):
    query: str

class ActiveSpeakerEvent(Event):
    query: str

class ToolCallEvent(Event):
    tool_call: ToolSelection
    tools: list[BaseTool]

class ToolCallResultEvent(Event):
    chat_message: ChatMessage

class ProgressEvent(Event):
    msg: str
  1. OrchestrationEvent to indicate that the workflow needs to transfer agents.
  2. After the agent transfer is completed, ActiveSpeakerEvent will tell the workflow to use the new agent to answer the user.
  3. If the agent needs to make a Function call, ToolCallEvent will be thrown to execute concurrently.
  4. The results of concurrent execution will be thrown out with ToolCallResultEvent, and summarized into the final result.
  5. Finally, we also need a ProgressEvent to stream the intermediate steps, making it easy for users to know where we are now. To avoid too much information interference, we only output the information of agent transfer here.

After defining various events, we need to start writing the workflow. The workflow this time is a bit complex, so to make it easier for everyone to understand, I still drew a flowchart:

The flowchart of our workflow. Image by Author
The flowchart of our workflow. Image by Author

First, let’s look at the start method. The start method is relatively simple, as the entry method for user dialogue, it is responsible for storing the user’s message in ChatMemory, and then judging whether there is an available agent currently, if there is, it throws an ActiveSpeakerEvent, entering the next step, if not, it throws an OrchestrationEvent, entering the agent orchestration.

The flowchart of the start method. Image by Author
The flowchart of the start method. Image by Author
class CustomerService(Workflow):
    ...

    @step
    async def start(
            self, ctx: Context, ev: StartEvent
    ) -> ActiveSpeakerEvent | OrchestrationEvent:
        self.memory.put(ChatMessage(
            role="user",
            content=ev.msg
        ))
        user_state = await ctx.get("user_state", None)
        if not user_state:
            await ctx.set("user_state", self.user_state)

        user_msg = ev.msg
        active_speaker = await ctx.get("active_speaker", default=None)

        if active_speaker:
            return ActiveSpeakerEvent(query=user_msg)
        else:
            return OrchestrationEvent(query=user_msg)

From shallow to deep, let’s look at how the agent orchestration orchestrate method works:

The flowchart of the orchestrate method. Image by Author
The flowchart of the orchestrate method. Image by Author

This method first takes the currently available agent and user state, and updates our ORCHESTRATION_PROMPT written in agents.py, thus getting the complete system_prompt.

Then we use TransferToAgent, system_prompt, and chat_history all to LLM, letting LLM judge which agent to hand over to next.

class CustomerService(Workflow):
    ...

    @step
    async def orchestrate(
            self, ctx: Context, ev: OrchestrationEvent
    ) -> ActiveSpeakerEvent | StopEvent:
        chat_history = self.memory.get()
        user_state_str = await self._get_user_state_str(ctx)
        system_prompt = ORCHESTRATION_PROMPT.format(
            agent_configs_str=get_agent_configs_str(),
            user_state_str=user_state_str
        )
        messages = [ChatMessage(role="system", content=system_prompt)] + chat_history
        tools = [get_function_tool(TransferToAgent)]
        event, tool_calls, _ = await self.achat_to_tool_calls(ctx, tools, messages)
        if event is not None:
            return event
        tool_call = tool_calls[0]
        selected_agent = tool_call.tool_kwargs["agent_name"]
        await ctx.set("active_speaker", selected_agent)
        ctx.write_event_to_stream(
            ProgressEvent(msg=f"In step orchestrate:nTransfer to agent: {selected_agent}")
        )
        return ActiveSpeakerEvent(query=ev.query)

After getting the latest agent, we update the context and throw an ActiveSpeakerEvent.

We also need to define achat_to_tool_calls and _get_user_state_str as these two tool methods. achat_to_tool_calls method is responsible for getting the currently needed tools from LLM. _get_user_state_str is used to convert the user state into a string.

class CustomerService(Workflow):
    ...

    async def achat_to_tool_calls(self,
                              ctx: Context,
                              tools: list[FunctionTool],
                              chat_history: list[ChatMessage]
    ) -> tuple[StopEvent | None, list[ToolSelection], ChatResponse]:
    response = await self.llm.achat_with_tools(tools, chat_history=chat_history)
    tool_calls: list[ToolSelection] = self.llm.get_tool_calls_from_response(
        response=response, error_on_no_tool_call=False
    )
    stop_event = None
    if len(tool_calls) == 0:
        await self.memory.aput(response.message)
        stop_event = StopEvent(
            result=response.message.content
        )
    return stop_event, tool_calls, response

    @staticmethod
    async def _get_user_state_str(ctx: Context) -> str:
        user_state = await ctx.get("user_state", None)
        user_state_list = [f"{k}: {v}" for k, v in user_state.items()]
        return "n".join(user_state_list)

After studying the Orchestrate branch, let’s see how the ActiveSpeaker branch works, which is the speak_with_sub_agent method:

The flowchart of the speak_with_sub_agent method. Image by Author
The flowchart of the speak_with_sub_agent method. Image by Author

This method first gets the current service-providing agent, as well as chat_history and user_state.

Then use the current agent’s sys_prompt and tools, as well as chat_history, to let LLM judge which tools to call next.

class CustomerService(Workflow):
    ...

    @step
    async def speak_with_sub_agent(
            self, ctx: Context, ev: ActiveSpeakerEvent
    ) -> OrchestrationEvent | ToolCallEvent | StopEvent:
        active_speaker = await ctx.get("active_speaker", default="")
        agent_config: AgentConfig = get_agent_config_pair()[active_speaker]
        chat_history = self.memory.get()
        user_state_str = await self._get_user_state_str(ctx)

        system_prompt = (
                agent_config.system_prompt.strip()
                + f"nn<user state>:n{user_state_str}"
        )
        llm_input = [ChatMessage(role="system", content=system_prompt)] + chat_history
        tools = [get_function_tool(RequestTransfer)] + agent_config.tools
        event, tool_calls, response = await self.achat_to_tool_calls(ctx, tools, llm_input)

        if event is not None:
            return event
        await ctx.set("num_tool_calls", len(tool_calls))
        for tool_call in tool_calls:
            if tool_call.tool_name == "RequestTransfer":
                await ctx.set("active_speaker", None)
                ctx.write_event_to_stream(
                    ProgressEvent(msg="The agent is requesting a transfer, please hold on...")
                )
                return OrchestrationEvent(query=ev.query)
            else:
                ctx.send_event(
                    ToolCallEvent(tool_call=tool_call, tools=agent_config.tools)
                )
        await self.memory.aput(response.message)

It is important to note that although the sample project is relatively simple, each agent’s tools only have one, but in actual projects, there are often multiple tools to be called concurrently. So we need to iterate through tool_calls, throwing ToolCallEvent separately.

At the same time, we also need to consider the situation where the current agent may not be able to handle the user’s request, thus calling RequestTransfer, then we need to go back to the orchestrate step and re-select the agent.

Let’s look at the handle_tool_calls section, the code in this section looks a lot, but the actual thing to do is very simple, just get the tool to be executed and execute it, so simple that I don’t even want to draw a flowchart.

class CustomerService(Workflow):
    ...

    @step(num_workers=4)
    async def handle_tool_calls(
            self, ctx: Context, ev: ToolCallEvent
    ) -> ToolCallResultEvent:
        tool_call = ev.tool_call
        tools_by_name = {tool.metadata.get_name(): tool for tool in ev.tools}
        tool_msg = None
        tool = tools_by_name[tool_call.tool_name]
        additional_kwargs = {
            "tool_call_id": tool_call.tool_id,
            "name": tool.metadata.get_name()
        }
        if not tool:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Tool {tool_call.tool_name} does not exists.",
                additional_kwargs=additional_kwargs
            )
            return ToolCallResultEvent(chat_message=tool_msg)

        try:
            if isinstance(tool, FunctionToolWithContext):
                tool_output = await tool.acall(ctx, **tool_call.tool_kwargs)
            else:
                tool_output = await tool.acall(**tool_call.tool_kwargs)

            tool_msg = ChatMessage(
                role="tool",
                content=tool_output.content,
                additional_kwargs=additional_kwargs
            )
        except Exception as e:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Encountered error in tool call: {e}",
                additional_kwargs=additional_kwargs
            )
        return ToolCallResultEvent(chat_message=tool_msg)

There is a small detail here, I set a parameter num_workers=4 for the step decorator. This is to tell the workflow that the concurrency is only up to 4, to avoid too high concurrency causing downstream system blockage.

Then we come to the last method aggregate_too_results.

class CustomerService(Workflow):
    ...

    @step
    async def aggregate_tool_results(
            self, ctx: Context, ev: ToolCallResultEvent
    ) -> ActiveSpeakerEvent | None:
        num_tool_calls = await ctx.get("num_tool_calls")
        results = ctx.collect_events(ev, [ToolCallResultEvent] * num_tool_calls)
        if not results:
            return None

        for result in results:
            await self.memory.aput(result.chat_message)
        return ActiveSpeakerEvent(query="")

This method is relatively simple, just get all the execution results of tool_calls, then write them into ChatMemory, and finally hand them back to the agent to evaluate the results and answer the user.

Step six, check our hard work

At this point in the plot, our customer service team has been built, let’s check if these agents are working hard. Start!

chainlit run src/app.py

Not bad, the front desk agent first asks for my name, simulating the login process, then based on my needs, hands it over to the pre-sales agent:

When my request was handed off to the pre-sales agent. Image by Author
When my request was handed off to the pre-sales agent. Image by Author

It can also be transferred to the after-sales agent based on my request:

When my request was handed off to the aftermarket agent. Image by Author
When my request was handed off to the aftermarket agent. Image by Author

I can even play with them all day!


Project Source Code

The code for this project is stored below, everyone can use or modify it without my permission:

GitHub – qtalen/multi-agent-customer-service


Conclusion

To summarize today’s lesson, in this customer service chatbot project, we successfully simulated the agent handoff capability of OpenAI Swarm using LlamaIndex Workflow, achieving seamless collaboration among multiple agents.

We can see that the agents handoff brings obvious advantages to the project:

  1. The workflow autonomously decides which agent to use based on the dialogue context, there is no fixed code process, and it is completely decided by LLM.
  2. Once the workflow decides which agent will serve the user, the user interacts directly with the corresponding agent, without any intermediate steps.

However, there are still some shortcomings in the project:

  1. The call to the agent is too low-level, causing us to handle the function calling process in the code.
  2. The modularity of the workflow is not well done, I have also mentioned this in previous articles, which brings some obstacles to team collaboration.

I look forward to gradually solving these problems in the upcoming articles. I welcome your comments and discussions and will reply to everyone as soon as possible.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.

The post Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm appeared first on Towards Data Science.

]]>
Deep Dive into LlamaIndex Workflow: Event-Driven LLM Architecture https://towardsdatascience.com/deep-dive-into-llamaindex-workflow-event-driven-llm-architecture-8011f41f851a/ Tue, 17 Dec 2024 11:01:58 +0000 https://towardsdatascience.com/deep-dive-into-llamaindex-workflow-event-driven-llm-architecture-8011f41f851a/ What I think about the progress and shortcomings after practice

The post Deep Dive into LlamaIndex Workflow: Event-Driven LLM Architecture appeared first on Towards Data Science.

]]>
Recently, Llamaindex introduced a new feature called Workflow in one of its versions, providing event-driven and logic decoupling capabilities for LLM applications.

In today’s article, we’ll take a deep dive into this feature through a practical mini-project, exploring what’s new and still lacking. Let’s get started.


Introduction

Why event-driven?

More and more LLM applications are shifting towards intelligent agent architectures, expecting LLMs to meet user requests through calling different APIs or multiple iterative calls.

This shift, however, brings a problem: as agent applications make more API calls, program responses slow down and code logic becomes more complex.

A typical example is ReActAgent, which involves steps like Thought, Action, Observation, and Final Answer, requiring at least three LLM calls and one tool call. If loops are needed, there will be even more I/O calls.

A typical ReAct agent will make at least three calls to LLM. Image by Author
A typical ReAct agent will make at least three calls to LLM. Image by Author

Is there a way to optimize this?

As shown in the diagram above, in a traditional programming model, all I/O calls are linear; the next task must wait until the previous one is completed.

Although mainstream LLMs now support result generation via stream output, in agent applications, we still need to wait for the LLM to finish generating results before returning or moving to the next phase.

Actually, we don’t need all I/O calls to proceed sequentially; they can be executed concurrently, as shown in the diagram below:

In concurrent programming, multiple steps are executed in parallel. Image by Author
In concurrent programming, multiple steps are executed in parallel. Image by Author

Does this diagram look familiar? Yes, Python’s asyncio package provides the ability to execute I/O-bound tasks concurrently, and nearly all I/O-based APIs, including LLM clients, support concurrent execution.

LlamaIndex’s Workflow also utilizes the principles of concurrent programming. It goes further by not only encapsulating the details of the asyncio library but also providing an event mechanism that allows us to decouple different segments of the business process.

Now that we understand the background, let’s step through LlamaIndex Workflow with an actual project.


First Impressions

Before the main course, let’s have an appetizer by familiarizing ourselves with the elements and basic principles through a simple code example.

Importing necessary packages

First, we need to import the necessary tools. Workflow is already included in the latest version of LlamaIndex, no separate installation is needed.

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    Context,
    step,
)

Defining some events

Since Workflow is an event-driven framework, we should start by defining some events.

To avoid inconsistencies, we can first define a BaseEvent, ensuring all events use the key payload for message passing.

class BaseEvent(Event):
    payload: str | dict | None

Let’s define our first event of the day: SecondStepEvent

class SecondStepEvent(BaseEvent):
    ...

Starting simple

Next, let’s start coding our first Workflow program, which is a subclass of Workflow containing two methods:

class SimpleWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> SecondStepEvent:
        return SecondStepEvent(payload=ev.payload)

    @step
    async def second_step(self, ev: SecondStepEvent) -> StopEvent:
        return StopEvent(result=ev.payload)
  1. The method start accepts a StartEvent and then returns a SecondStepEvent.
  2. The method second_step accepts a SecondStepEvent and then returns a StopEvent.

Let’s get the code up and running to see how it works.

s_wf = SimpleWorkflow(timeout=10, verbose=True)
result = await s_wf.run(payload="hello world")
print(result)

We have turned on the verbose option so that we can see in detail how the code is executed.

The result of the execution of our first Workflow program. Image by Author
The result of the execution of our first Workflow program. Image by Author

Trying out the visualization tool

LlamaIndex also generously provides a small tool that allows us to see the entire workflow process, which is very intuitive.

from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(SimpleWorkflow, filename="simple_workflow.html")
Flowchart of the first Workflow code. Image by Author
Flowchart of the first Workflow code. Image by Author

Explaining the principles

A quick look at the source code reveals that Workflow internally maintains a Context, which not only keeps an event queue but also maintains a dictionary containing each step.

Workflow uses a run_flow loop to listen for events and execute steps. Image by Author
Workflow uses a run_flow loop to listen for events and execute steps. Image by Author

When Workflow is initialized, the step decorator analyzes the signature of each method to determine which events it will receive and return, starting to listen to the event queue, and then storing this method in the step dictionary.

When the Workflow’s run method is launched, it starts a runflow loop, initially placing a StartEvent in the event queue. If there’s a method that accepts this StartEvent, it starts executing and returns the corresponding event, putting it back into the event queue.

The step method can also directly call the Context’s send_event method to place an event in the queue.

If the runflow loop detects a StopEvent in the queue, it exits the flow and returns the final result.

With a basic understanding of the elements and implementation principles, we can now explore the advantages and shortcomings of the Workflow through a hands-on project.


Hands-on Project

In today’s hands-on project, we will help the supermarket’s purchasing manager create a system to manage SKU inventory based on customer feedback, demonstrating Workflow’s branching and looping control, Streaming events, and concurrent execution features.

Branching and looping control

In the first version of the feedback monitor, we’ll continuously monitor the latest feedback of a certain SKU, analyze the feedback implied in the input, and then make corresponding actions.

The entire code logic is shown in the diagram below:

Flowchart of our FeedbackMonitor program. Image by Author
Flowchart of our FeedbackMonitor program. Image by Author

First, we’ll define an InventoryManager class that uses async to implement the place_order and clear_out methods.

class InventoryManager:
    async def place_order(self, sku: str) -> None:
        await asyncio.sleep(0.5)
        print(f"Will place an order for {sku}")

    async def clear_out(self, sku: str) -> None:
        await asyncio.sleep(0.5)
        print(f"Will clear out {sku}")

We also need to implement four events: LoopEvent, GetFeedbackEvent, OrderEvent, and ClearEvent, all of which are subclasses of BaseEvent, ensuring they follow a unified message-passing interface.

class LoopEvent(BaseEvent):
    ...

class GetFeedbackEvent(BaseEvent):
    ...

class OrderEvent(BaseEvent):
    ...

class ClearEvent(BaseEvent):
    ...

Next, we start implementing the FeedbackMonitorWorkflow class, which contains the core business logic.

class FeedbackMonitorWorkflow(Workflow):
    def __init__(self, total_cycle: int = 1, *args, **kwargs) -> None:
        self.total_cycle = total_cycle
        self.counter = 0
        self.manager = InventoryManager()
        super().__init__(*args, **kwargs)

    @step    
    async def begin(self, ev: StartEvent | LoopEvent) 
            -> GetFeedbackEvent | StopEvent:
        print("We now return to the begin step")
        if isinstance(ev, StartEvent):
            self.sku = ev.payload

        if self.counter < self.total_cycle:
            await asyncio.sleep(3)
            self.counter += 1
            return GetFeedbackEvent(payload=self.sku)
        else:
            return StopEvent(result="We're done for the day.")

    @step
    async def get_feedback(self, ev: GetFeedbackEvent) -> OrderEvent | ClearEvent:
        print(f"Wil get the latest feedback for {ev.payload}")
        if random.random() < 0.3:
            return ClearEvent(payload='Bad')
        else:
            return OrderEvent(payload='Good')

    @step    
    async def order(self, ev: OrderEvent) -> LoopEvent:
        print(f"We now buy some sku with feedback {ev.payload}.")
        await self.manager.place_order(self.sku)
        return LoopEvent(payload="Start a new cycle.")

    @step
    async def clear(self, ev: ClearEvent) -> LoopEvent:
        print(f"We now sell some sku with feedback {ev.payload}")
        await self.manager.clear_out(self.sku)
        return LoopEvent(payload="Start a new cycle.")
  1. The begin method is our entry point, accepting StartEvent and LoopEvent.
  2. The StartEvent is the default event that starts the code, and we pass the SKU through this event.
  3. The GetFeedbackEvent triggers the get_feedback method to obtain feedback information. For simplicity, we use the random method to generate two feedback, "Good" and "Bad", and then return the corresponding OrderEvent or ClearEvent based on the feedback.
  4. After a transaction is completed, the LoopEvent reinitiates the begin method for a new round of looping. To simplify the code, we set only one loop.
  5. In each loop, the begin method returns a GetFeedbackEvent to trigger the acquisition of the latest SKU feedback. If all loops are completed, it returns a StopEvent.
  6. When a OrderEvent or ClearEvent is received, the corresponding step method executes the transaction based on the sentiment flag in the message body and returns a LoopEvent to start a new loop.

As you can see, by using events, we can decouple complex loops and branching processes, making it possible for corresponding events to trigger new loops.

Let’s use the draw_all_possible_flows tool to see if the flow chart matches our designed business logic diagram.

draw_all_possible_flows(FeedbackMonitorWorkflow, filename="feedback_monitor_workflow.html")
We use events to decouple branching and looping control. Image by Author
We use events to decouple branching and looping control. Image by Author

Is that all? If it’s just about decoupling loops and branching controls, couldn’t I achieve that with some coding tricks?

Yes, but flow control is just the most superficial layer. Next, let’s experience the powerful potential unleashed by combining asyncio with Workflow.

Streaming events

When building an agent chain, one of the most headache-inducing issues is how to feed back messages during the execution process to users, helping them understand the progress of code execution.

In the code above, we use the print method to print progress in real-time on the console, but this approach is not feasible for a web applications.

One solution is to launch a separate pipeline to push messages to users in real-time, but when multiple steps are executed concurrently, how to handle this pipeline becomes a challenge.

Fortunately, the Workflow’s Context directly provides a message streaming pipeline, and we can conveniently write messages into this pipeline and handle them uniformly at the calling end through an async for loop.

LlamaIndex Workflow uses a streaming queue to output messages. Image by Author
LlamaIndex Workflow uses a streaming queue to output messages. Image by Author

Let’s modify our previous trading program:

class ProgressEvent(BaseEvent):
    ...

class FeedbackMonitorWorkflowV2(Workflow):
    def __init__(self, total_cycle: int = 1, *args, **kwargs) -> None:
        self.total_cycle = total_cycle
        self.counter = 0
        self.manager = InventoryManager()
        super().__init__(*args, **kwargs)

    @step    
    async def begin(self, ctx: Context,
                    ev: StartEvent | LoopEvent) 
            -> GetFeedbackEvent | StopEvent:
        ctx.write_event_to_stream(
            ProgressEvent(payload="We now return to the begin step")
        )
        ...

    @step
    async def get_feedback(self, ctx: Context,
                            ev: GetFeedbackEvent) -> OrderEvent | ClearEvent:
        ctx.write_event_to_stream(
            ProgressEvent(payload=f"Wil get the latest feedback for {ev.payload}")
        )
        ...

    @step    
    async def order(self, ctx: Context,
                  ev: OrderEvent) -> LoopEvent:
        ctx.write_event_to_stream(
            ProgressEvent(payload=f"We now buy some sku with feedback {ev.payload}.")
        )
        ...

    @step
    async def clear(self, ctx: Context,
                   ev: ClearEvent) -> LoopEvent:
        ctx.write_event_to_stream(
            ProgressEvent(payload=f"We now sell some sku with feedback {ev.payload}")
        )
        ...

In the first step, we pass a Context type parameter in the signature of the step method. This lets Workflow know to pass the current execution context into the step method.

Then, we replace the print method with the ctx.write_event_to_stream method to write messages into the pipeline in real time.

Finally, before waiting for the final result, we use the stream_events method to iterate over the latest messages from the message pipeline.

from datetime import datetime

def streaming_log(message: str) -> None:
    current_time = datetime.now().strftime("%H:%M:%S")
    print(f"{current_time} {message}")

feedback_monitor_v2 = FeedbackMonitorWorkflowV2(timeout=10, verbose=False)
handler = feedback_monitor_v2.run(payload="Apple")
async for event in handler.stream_events():
    if isinstance(event , ProgressEvent):
        streaming_log(event.payload)
final_result = await handler
print("Final result: ", final_result)
During code execution, Workflow streams out messages through the streaming queue. Image by Author
During code execution, Workflow streams out messages through the streaming queue. Image by Author

Concurrent execution

As mentioned at the beginning of the article, for I/O-bound tasks, we can use the asyncio package to make the code execute concurrently, greatly improving the running efficiency. Workflow implements this mechanism for us, encapsulating the asyncio execution code, and letting us focus on the code logic.

Let’s explain using the FeedbackMonitor project as an example.

We can let multiple steps execute in parallel to optimize execution time. Image by Author
We can let multiple steps execute in parallel to optimize execution time. Image by Author

This time, we’ll upgrade the project, allowing the FeedbackMonitor to judge whether it’s Good or Bad not through one source but simultaneously through online, offline, and a machine learning trending predictor.

First, we add six events: OnlineEvent, OnlineFeedbackEvent, OfflineEvent, OfflineFeedbackEvent, TrendingPredictionEvent, and PredictionResultEvent.

from collections import Counter

class OnlineEvent(BaseEvent):
    ...

class OnlineFeedbackEvent(BaseEvent):
    ...

class OfflineEvent(BaseEvent):
    ...

class OfflineFeedbackEvent(BaseEvent):
    ...

class TrendingPredictionEvent(BaseEvent):
    ...

class PredictionResultEvent(BaseEvent):
    ...

class TradeEvent(BaseEvent):
    ...

Then, we write a ComplexFeedbackMonitor class as a new Workflow.

class ComplexFeedbackMonitor(Workflow):
    def __init__(self, *args, **kwargs):
        self.manager = InventoryManager()
        super().__init__(*args, **kwargs)

    @step
    async def start(self, ctx: Context, ev: StartEvent) 
            -> OnlineEvent | OfflineEvent | TrendingPredictionEvent:
        self.sku = ev.payload

        ctx.send_event(OnlineEvent(payload=ev.payload))
        ctx.send_event(OfflineEvent(payload=ev.payload))
        ctx.send_event(TrendingPredictionEvent(payload=ev.payload))

    @step    
    async def online_feedback(self, ev: OnlineEvent) -> OnlineFeedbackEvent:
        await asyncio.sleep(random.randint(1, 3))
        if random.random() < 0.3:
            return OnlineFeedbackEvent(payload='Bad')
        else:
            return OnlineFeedbackEvent(payload='Good')

    @step
    async def offline_feedback(self, ev: OfflineEvent) -> OfflineFeedbackEvent:
        await asyncio.sleep(random.randint(1, 3))
        if random.random() < 0.3:
            return OfflineFeedbackEvent(payload='Bad')
        else:
            return OfflineFeedbackEvent(payload='Good')

    @step
    async def trending_predict(self, ev: TrendingPredictionEvent) -> PredictionResultEvent:
        await asyncio.sleep(random.randint(1, 3))
        if random.random() < 0.3:
            return PredictionResultEvent(payload='Bad')
        else:
            return PredictionResultEvent(payload='Good')

    @step
    async def trading_decision(self, ctx: Context,
                               ev: OnlineFeedbackEvent | OfflineFeedbackEvent | PredictionResultEvent)
        -> TradeEvent:
        results = ctx.collect_events(ev, 
            [OnlineFeedbackEvent, OfflineFeedbackEvent, PredictionResultEvent])
        if results is not None:
            voting = dict(Counter([ev.payload for ev in results]))
            print(voting)
            feedback = max(voting, key=voting.get)
            return TradeEvent(payload=feedback)

    @step        
    async def trade(self, ev: TradeEvent) -> StopEvent:
        feedback = ev.payload
        match feedback:
            case 'Goode':
                await self.manager.place_order(self.sku)
            case 'Bad':
                await self.manager.clear_out(self.sku)
            case _:
                print("Do nothing")
        return StopEvent(result='We are done for the day.')

In the start method, we use ctx.send_event to simultaneously throw out OnlineEvent, OfflineEvent, and TrendingPredictionEvent. Since Workflow determines which messages were thrown out based on the typing annotation of the step method, we still need to mark the returned message types.

Next, we implement the online_feedback, offline_feedback, and trending_predict methods to obtain trading signals and return the corresponding events.

We still use the random method to simulate customer feedback analysis.

As content from different sources requires different parsing times, we hope to wait until all messages return before making a trading decision. At this point, we can use the ctx.collect_events method in the trading_decision method.

Each time a new feedback event returns, the trading_events method executes once.

But the ctx.collect_events method takes all the events we need to wait for as parameters, and its return value remains empty until all feedback events return. At that point, the return value is a list of three feedback events.

We can use the Counter method to count how many times "Good" and "Bad" appear, then take the most voted mark to make a trading decision.

Finally, let’s use the draw_all_possible_flows tool to see how cool our newly designed workflow is:

draw_all_possible_flows(ComplexFeedbackMonitor, filename='complex_feedback_monitor.html')
Workflow executes three asynchronous tasks in parallel and gets the final result. Image by Author
Workflow executes three asynchronous tasks in parallel and gets the final result. Image by Author

Next, let’s execute this workflow and see.

feedback_monitor = ComplexFeedbackMonitor(timeout=20, verbose=True)
result = await feedback_monitor.run(payload='Apple')
print(result)
Detailed process of executing code in parallel with Workflow. Image by Author
Detailed process of executing code in parallel with Workflow. Image by Author

We can observe that the three methods to obtain feedback from different sources are triggered simultaneously but return at different times.

The first two returned events can trigger the trading_decision method, but cannot continue to trigger the TradeEvent. Only after all three events return and the final trading decision is calculated, is the TradeEvent triggered.

As you can see, with the power of Workflow, we can indeed make our code architecture both clear and efficient.

But don’t be too optimistic, because after some time in practice, I think there are still some shortcomings.


Time to Talk about Shortcomings

If you review our previous code, you’ll notice that all our code logic is written in the same Workflow, which is fine for simple applications but a disaster for complex real-world applications.

Ideally, we should split different logic into Workflows to maintain the purity of the "single responsibility" principle. The official solution to this requirement is nested workflows:

Nested workflows

Suppose we want to split the trading order logic from the FeedbackMonitor into an independent Workflow. How should we call it when we need to place an order?

The official solution is a nested workflows, i.e., passing another workflow B as a parameter in the step method of workflow A. Then, after workflow A is instantiated, add the instance of workflow B. As shown in the following code:

class OrderStation(Workflow):
    def __init__(self, *args, **kwargs):
        self.manager = InventoryManager()
        super().__init__(*args, **kwargs)

    @step
    async def trade(self, ev: StartEvent) -> StopEvent:
        print("We are now in a new workflow named OrderStation")
        feedback = ev.feedback
        match feedback:
            case 'Good':
                await self.manager.place_order(ev.sku)
            case 'Bad':
                await self.manager.clear_out(ev.sku)
        return StopEvent(result="Done!")

class ComplexFeedbackMonitorV2(ComplexFeedbackMonitor):
    @step
    async def trade(self, ev: TradeEvent, order_station: OrderStation) -> StopEvent:
        feedback = ev.payload
        await order_station.run(feedback=feedback, sku=self.sku)
        return StopEvent(result='We are done for the day.')
feedback_monitor_v2 = ComplexFeedbackMonitorV2(timeout=20, verbose=False)
feedback_monitor_v2.add_workflows(
    order_station=OrderStation(timeout=10, verbose=True)
)
result = await feedback_monitor_v2.run(payload='Apple')
print(result)

Wait a minute, if you have Java development experience, will you be surprised to see this code: isn’t this dependency injection?

Nested Workflow works like "dependency injection". Image by Author
Nested Workflow works like "dependency injection". Image by Author

It’s indeed similar to dependency injection, but the difference is that we still need to explicitly add the specific workflow instance after the instance is initialized, so there is still coupling, which is the first problem.

Another problem I found during coding is that for nested workflows, I can only call them through the run method, not by calling the corresponding step method in the nested workflow from an external workflow.

Therefore, this is not a good solution for communication between workflows.

Communicate between Workflows

So, is there any way to truly achieve communication between workflows? I searched the API documentation and couldn’t find an official solution, and I noticed that this issue also went unanswered. So I decided to try it myself to see if I could solve it.

After reviewing the source code again, I think the ctx.send_event method has some potential, so the first thing I thought of was whether sharing the same Context between two workflows could solve it?

I noticed that instantiating Context requires passing in a workflow instance and setting a workflow’s own Context can be done by passing it in during the run method.

So the code is as follows, keeping the two workflows unchanged, only that the step method in the OrderStation no longer accepts a StartEvent but a specific TradeEventV2.

class TradeEventV2(Event):
    feedback: str
    sku: str

class OrderStation(Workflow):
    def __init__(self, *args, **kwargs):
        self.manager = InventoryManager()
        super().__init__(*args, **kwargs)

    @step
    async def trade(self, ev: TradeEventV2) -> StopEvent:
        print("We are now in a new workflow named OrderStation")
        feedback = ev.feedback
        match feedback:
            case 'Good':
                await self.manager.place_order(ev.sku)
            case 'Bad':
                await self.manager.clear_out(ev.sku)
        return StopEvent(result="Done!")

class ComplexFeedbackMonitorV3(ComplexFeedbackMonitor):
    @step
    async def trade(self, ctx: Context, ev: TradeEvent) -> StopEvent | TradeEventV2:
        feedback = ev.payload
        ctx.send_event(
            TradeEventV2(feedback=feedback, sku=self.sku)
        )
        return StopEvent(result='We are done for the day.')

Then I use the OrderStation to create a Context instance and pass it into the FeedbackMonitor instance during the run method execution, and sure enough, it throws an error:

feedback_monitor_v3 = ComplexFeedbackMonitorV3(timeout=20, verbose=False)
result = await feedback_monitor_v3.run(payload='Apple')
print(result)
I get an error when I use Context to communicate between two Workflows. Image by Author
I get an error when I use Context to communicate between two Workflows. Image by Author

It seems there is a problem with the method signature validation, let’s try turning off the validation:

feedback_monitor_v3 = ComplexFeedbackMonitorV3(timeout=20, verbose=False, disable_validation=True)
order_station = OrderStation(timeout=10, verbose=True)
result = await feedback_monitor_v3.run(ctx=Context(workflow=order_station),
                           payload='Apple')
print(result)

Still no luck, it seems this way won’t work.

The trade method in TradeStation is not triggered. Image by Author
The trade method in TradeStation is not triggered. Image by Author

Unbound syntax

Then, I noticed that the documentation mentioned a kind of Unbound syntax, which seems to be able to decouple each step’s logic from the Workflow. The example code is as follows:

class TestWorkflow(Workflow):
    ...

@step(workflow=TestWorkflow)
def some_step(ev: StartEvent) -> StopEvent:
    return StopEvent()

Although we can still only run within one Workflow, it made me feel the feasibility of communication between modules.

Due to the length of the article, I won’t use code to explain here, let me show you a diagram of how to use Unbound syntax for module communication:

A diagram to describe how Unbound syntax decouples code logic into multiple modules. Image by Author
A diagram to describe how Unbound syntax decouples code logic into multiple modules. Image by Author

As shown in the diagram: First, we can define an Application class as a Workflow pipeline, and simultaneously define the required events.

Then, each project team can write their own business logic code and use different step methods to listen and send messages externally.

Finally, we can call the run method of the Application in the fastapi API to mobilize various modules to complete the task.

In this way, business logic can be split into different modules for development, and then different step methods can be called using events.

This indeed achieves the purpose of logic decoupling. However since this method only registers each step to the Workflow in the step decorator through the add_step method, it still does not achieve real communication between Workflows.


Summary

The new feature of LlamaIndex’s Workflow, makes parallel execution of RAG, LLM generation, and I/O calls a very simple task, and the event-driven architecture also allows the program to decouple from complex logic control.

In today’s article, I demonstrated several features of Workflow through a FeedbackMonitor project.

In project practice, we also found that Workflow still has shortcomings in communication between modules, and we discussed different solutions including nested workflows and unbound syntax.

Finally, as agent frameworks like Langchain and AutoGen start to propose their own event-driven architectures, I believe Workflow is on the right path and will see long-term development. Let’s keep an eye on it.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.

The post Deep Dive into LlamaIndex Workflow: Event-Driven LLM Architecture appeared first on Towards Data Science.

]]>
The Math Behind Keras 3 Optimizers: Deep Understanding and Application https://towardsdatascience.com/the-math-behind-keras-3-optimizers-deep-understanding-and-application-2e5ff95eb342/ Sat, 17 Aug 2024 14:46:25 +0000 https://towardsdatascience.com/the-math-behind-keras-3-optimizers-deep-understanding-and-application-2e5ff95eb342/ This is a bit different from what the books say.

The post The Math Behind Keras 3 Optimizers: Deep Understanding and Application appeared first on Towards Data Science.

]]>
Introduction

Optimizers are an essential part of everyone working in machine learning.

We all know optimizers determine how the model will converge the loss function during gradient descent. Thus, using the right optimizer can boost the performance and the efficiency of model training.

Besides classic papers, many books explain the principles behind optimizers in simple terms.

However, I recently found that the performance of Keras 3 optimizers doesn’t quite match the mathematical algorithms described in these books, which made me a bit anxious. I worried about misunderstanding something or about updates in the latest version of Keras affecting the optimizers.

So, I reviewed the source code of several common optimizers in Keras 3 and revisited their use cases. Now I want to share this knowledge to save you time and help you master Keras 3 optimizers more quickly.

If you’re not very familiar with the latest changes in Keras 3, here’s a quick rundown: Keras 3 integrates TensorFlow, PyTorch, and JAX, allowing us to use cutting-edge Deep Learning frameworks easily through Keras APIs.

Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide


Preparation

Some utility methods

I plan to show you through some charts how these optimizers affect the convergence of loss functions.

Before starting, I need to prepare some utility methods for creating these charts.

First, I’ll use sklearn to generate a virtual dataset for a classification task:

from sklearn.datasets import make_moons
X, y = make_moons(n_samples=300, noise=0.1, random_state=42)

Since the dataset is quite simple, I plan to build a basic multilayer perceptron model with three hidden layers and a dual-node output layer:

from Keras import layers, utils, ops
import keras

def build_model(input_shape: tuple):
    inputs = layers.Input(shape=input_shape)
    x = layers.Dense(12, activation='relu')(inputs)
    x = layers.Dense(13, activation='relu')(x)
    x = layers.Dense(13, activation='relu')(x)
    outputs = layers.Dense(2, activation='softmax')(x)
    return keras.Model(inputs=inputs, outputs=outputs)

Finally, our utility method will evaluate how different optimizers affect model loss convergence. It also takes a parameter for epochs to highlight details of some optimizers.

This method will also use the matplotlib library to plot the model loss convergence curves, making the assessment more visual:

Python">def fit_show_model(optimizer: keras.optimizers.Optimizer,
                      epochs: int = 800
                    ):
    my_model = build_model(input_shape=(2,))
    my_model.compile(optimizer=optimizer, 
                     loss='sparse_categorical_crossentropy', 
                     metrics=['accuracy'])
    history = my_model.fit(X, y, batch_size=32, epochs=epochs, verbose=0)
    loss=history.history['loss']
    _, ax = plt.subplots(figsize=(5, 3))
    ax.plot(range(len(loss)), loss, 'b')
    ax.set(
        xlabel="Epoch",
        ylabel="Loss"
    )
    plt.show()

Variable abbreviations

Since this article is full of mathematical expressions, I plan to abbreviate some common variables to make these expressions clearer. For example:

  • lr stands for learning_rate.
  • g represents the gradient at the current node.
  • e is epsilon, a very small number added to the denominator to prevent it from being zero.
  • sqrt refers to np.sqrt or ops.sqrt, which is used to take the square root of an expression.

After these preparations, I will start explaining each optimizer.


Detailed Explanation of Common Optimizers

SGD

Also known as Stochastic Gradient Descent, it’s almost everyone’s first encounter with an optimizer.

Its principle is simple: randomly select a small batch of data samples, calculate the gradient at each node, and then update the weight of the current node using the learning rate.

In Keras 3, its mathematical expression is as follows:

w = w - lr * g

We can use SGD as a baseline to visually compare the effects of other optimizers:

fit_show_model(optimizer=keras.optimizers.SGD(learning_rate=0.01))
With SGD, the losses will converge to 0. in about 500 epochs. Image by Author
With SGD, the losses will converge to 0. in about 500 epochs. Image by Author

SGD’s biggest drawback is that its learning rate is fixed at every point on the curve, leading to quick weight changes on steep slopes and slow changes on flat ones. This makes it easy to get stuck in local minima (a phenomenon well-documented elsewhere, so I won’t repeat it here).

How can we avoid getting stuck? Just like helping a car out of the mud, we can give momentum to the weight changes to push it forward, solving this problem.

The mathematical expression with momentum is shown below:

m = momentum * m - lr * g

You can see that momentum speeds up loss convergence:

fit_show_model(optimizer=keras.optimizers.SGD(learning_rate=0.01, 
                                              momentum=0.9))
You can see that momentum speeds up loss convergence. Image by Author
You can see that momentum speeds up loss convergence. Image by Author

As mentioned before, since SGD’s learning rate is fixed, we need to set a reasonable learning rate. If it’s too high, the weight will oscillate back and forth across the valley. If it’s too low, the weight will take longer to find the valley, and more likely to get stuck.

To address this, besides using momentum, we can also predict the next direction of the weight change and add this prediction to the current weight. This method is called the Nesterov method. Its mathematical expression is as follows:

m = momentum * m - lr * g
w = w + (momentum * m -lr * g)
fit_show_model(optimizer=keras.optimizers.SGD(learning_rate=0.01,
                                              momentum=0.9,
                                              nesterov=True))
The Nesterov method allows for smoother convergence of loss. Image by Author
The Nesterov method allows for smoother convergence of loss. Image by Author

You can see that in Keras 3, using the Nesterov method requires setting the momentum parameter first; otherwise, we can’t predict the direction of the next weight change.

Adagrad

After discussing algorithms related to stochastic gradient descent, let’s talk about optimizers related to adaptive algorithms. The simplest one is Adagrad.

The core idea of adaptive algorithms is to dynamically adjust the learning rate as training progresses.

Based on this idea, Adagrad adjusts the learning rate by accumulating the sum of the squares of the gradients from historical iterations and dividing the learning rate by this sum. In Keras 3, Adagrad’s mathematical expression is as follows:

accumulator = accumulator + g**2
w = w - (lr * g) / sqrt(accumulator + e)
fit_show_model(optimizer=keras.optimizers.Adagrad(learning_rate=0.01))
Adagrad does not converge losses very quickly. Image by Author
Adagrad does not converge losses very quickly. Image by Author

From the chart, we can see that Adagrad does not converge losses very quickly, even slower than the default SGD algorithm, because the learning rate decreases as training progresses.

RMSprop

To address the issue of Adagrad decreasing the learning rate too much, Hinton proposed an improved algorithm in 2012. This algorithm doesn’t simply add up the squares of the gradients; it assigns a weight to the square of each gradient, giving more weight to recent iterations. The rest of the calculation is similar to Adagrad.

v = rho * v + (1 - rho) * g**2
w = w - lr * g / sqrt(v + e)
fit_show_model(optimizer=keras.optimizers.RMSprop(learning_rate=0.01),
               epochs=100)
RMSprop is much faster than Adagrad and SGD. Image by Author
RMSprop is much faster than Adagrad and SGD. Image by Author

As you can see, although loss convergence isn’t very stable, it reaches near zero around 40 epochs, much faster than Adagrad and SGD.

In Keras 3, RMSprop also supports setting momentum and centered parameters. Momentum adds momentum to the weight changes, and if the centered parameter is set, the optimizer doesn’t directly use the accumulated square of the gradients but makes a correction using the moving average of the gradients. The expression is as follows:

v = rho * v + (1 - rho) * g**2
average_grad = rho * average_grad + (1 - rho) * g
m = momentum * m + (lr * g) / sqrt(v - average_grad**2 + e)
w = w - m
fit_show_model(optimizer=keras.optimizers.RMSprop(learning_rate=0.01,
                                                  momentum=0.9,
                                                  centered=True),
               epochs=100)
RMSprop converges losses faster but still has stability issues. Image by Author
RMSprop converges losses faster but still has stability issues. Image by Author

You can see that using the momentum and centered parameters, RMSprop converges losses faster but still has stability issues.

Adam

Let’s talk about the Adam optimizer. Unlike the previous two algorithms, Adam not only uses the accumulated square of the gradients but also the first moment of the gradients. So, Adam has two extra hyperparameters: beta_1 and beta_2.

In Keras 3, Adam has evolved further. Now, it adjusts beta_1 and beta_2 exponentially based on the current step of the iteration, affecting the size of the learning rate. This evolution makes the Adam optimizer very suitable for time-sensitive scenarios like speech recognition:

t = 0 # current iteration 
local_step = t + 1 
beta_1_power = power(beta_1, local_step) 
beta_2_power = power(beta_2, local_step) 
alpha = lr * sqrt(1 - beta_2_power) / (1 - beta_1_power) 
m = m + (1 - beta_1) * (g - m) 
v = v + (1 - beta_2) * (g**2 - v) 
w = w - (alpha * m) / (sqrt(v) + e)
fit_show_model(optimizer=keras.optimizers.Adam(learning_rate=0.01),
               epochs=100)
Adam optimizer converges losses very quickly and smoothly. Image by Author
Adam optimizer converges losses very quickly and smoothly. Image by Author

From the chart, you can see that the Adam optimizer converges losses very quickly and smoothly, reaching near zero by the 20th epoch.

AdamW

In Keras 3, there is also an optimizer called AdamW, which, as the name suggests, is similar to Adam but adjusts the weights with a constant decay amount through the weight_decay parameter.

w = w - weight_decay * lr * w
fit_show_model(optimizer=keras.optimizers.AdamW(learning_rate=0.01,
                                                weight_decay=0.01),
               epochs=100)
AdamW is similar to Adam but adjusts the weights with a constant decay. Image by Author
AdamW is similar to Adam but adjusts the weights with a constant decay. Image by Author

From the source code, it’s clear that the AdamW optimizer is actually calling the Adam optimizer and assigning a value to the weight_decay parameter.

Nadam

Then there is the Nadam optimizer, which, as the name suggests, is a variant of the Adam optimizer.

In Keras 3, it incorporates the idea of Nesterov, not only focusing on the current iteration step but also on the impact of the next step. Then it combines these two effects on the beta_1 and beta_2 parameters.

So, of all the optimizers, Nadam’s algorithm is the most complex:

t = 0 
decay = 0.96 
local_step = t + 1 
next_step = t + 2

u_t = beta_1 * (1.0 - 0.5 * power(decay, local_step)) 
u_t_1 = beta_2 * (1.0 - 0.5 * power(decay, next_step)) 
u_product_t = u_product_t * beta_1 * (1.0 - 0.5 * power(decay, local_step))

u_product_t_1 = u_product_t * u_t_1

beta_2_power = power(beta_2, local_step)
m = m + (1 - beta_1) * (g - m) 
v = v + (1 - beta_2) * (g**2 - v)
m_hat = u_t_1 * m / (1 - u_product_t_1) + ((1 - u_t) * g) / (1 - u_product_t) 
v_hat = v / (1 - beta_2_power)

w = w - (lr * m_hat) / (sqrt(v_hat) + epsilon)
fit_show_model(optimizer=keras.optimizers.Nadam(learning_rate=0.01),
               epochs=100)
For some simple classification tasks, the improvement from Nadam is not much. Image by Author
For some simple classification tasks, the improvement from Nadam is not much. Image by Author

However, for some simple classification tasks, the improvement from Nadam is not much.

Lion

Finally, let me mention the Lion optimizer, a new implementation proposed by Chen et al., 2023. This optimizer’s biggest feature is that it doesn’t calculate accumulations, so it uses less memory. It also doesn’t calculate the second moment, so it’s less complex.

It simply calculates a sign based on the current gradient and lets the weight and learning rate change according to this sign:

w = w - lr * sign(beta_1 * m + (1 - beta_1) * g)
m = beta_2 * m + (1 - beta_2) * g
fit_show_model(optimizer=keras.optimizers.Lion(learning_rate=0.001),
               epochs=100)
The performance of Lion optimizer isn't as good as the Adam series. Image by Author
The performance of Lion optimizer isn’t as good as the Adam series. Image by Author

You can see that the performance of this optimizer isn’t as good as the Adam series, probably the price paid for saving resources.


Conclusion

As mentioned at the beginning of the article, optimizers, as one of the most critical parts of deep learning, are skills that every practitioner needs to master and apply.

Also, with technological advancements, the implementation of optimizers within the latest deep learning frameworks is continuously evolving in terms of computational efficiency and application scenarios.

This article introduced the mathematical implementation of several common optimizers in Keras 3.

If you’re still confused about how to use the optimizers in Keras 3, I suggest starting with Adam. After achieving good results, you can choose a more suitable optimizer based on specific scenarios.

What else would you like to know about Keras 3? Feel free to leave comments and discuss. See you next time.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.

The post The Math Behind Keras 3 Optimizers: Deep Understanding and Application appeared first on Towards Data Science.

]]>
Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide https://towardsdatascience.com/keras-3-0-tutorial-end-to-end-deep-learning-project-guide-3552187e3ff5/ Sat, 18 May 2024 14:25:54 +0000 https://towardsdatascience.com/keras-3-0-tutorial-end-to-end-deep-learning-project-guide-3552187e3ff5/ Implement an encoder-decoder recurrent network from scratch

The post Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide appeared first on Towards Data Science.

]]>
Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide. Image by Author
Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide. Image by Author

Introduction

Even though I started using Pytorch a while ago, I still miss the concise code style of Keras and the good old days when you could implement a neural network model in just a few lines of code.

So, I was thrilled when Keras announced last November that in addition to TensorFlow, it now also supports Pytorch and Jax as backends!

However, things weren’t perfect: since Keras 3.0 was released not long ago, the related tutorials and documentation hadn’t caught up, and I encountered some troubles during the code migration.

Luckily, after some effort, I can now smoothly use version 3.0 for various end-to-end model developments.

In this article, I’ll share some practical experiences with Keras 3.0 to help you avoid some detours. I’ll use a typical encoder-decoder recurrent neural network as an example to explain how to complete an end-to-end project from scratch using the subclassing API of Keras 3.0, and discuss details to consider when using Pytorch as the backend.

Let’s get started.


Framework Installation and Environment Setup

Framework installation

Installing Keras 3.0 (or the latest version) is simple, just follow the Getting Started documentation on the official website.

Before installing Keras, it’s recommended to install Pytorch with the corresponding CUDA version first. Either CUDA 11.8 or CUDA 12.1 works, depending on your graphics card driver support.

Although Pytorch can be used as a backend, Tensorflow version 2.16.1 is still installed by default during the Keras installation process.

This version of Tensorflow is compiled based on CUDA 12.3, so after installing Keras, you might encounter a warning about missing CUDA (see this issue).

Could not find cuda drivers on your machine, GPU will not be used.

Since we are using Pytorch as the backend, my advice is to ignore this warning.

Alternatively, you can set a system variable to permanently turn off Tensorflow’s logs.

import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

Environment Configuration

After installing both Pytorch and Keras, you need to set the environment variable to configure Keras’s backend to Pytorch. There are two ways to do this:

  • Modify the configuration file.
  • Set an environment variable.

First, let’s discuss using the configuration file method.

Keras’s configuration file is located in ~/.keras/keras.json. If you are using Windows, this file is located in your <user directory>/.keras/keras.json.

Of course, you can also change the location of the .keras directory by setting the KERAS_HOME environment variable.

Note, you might not find the .keras directory immediately after installing Keras for the first time. At that point, you can execute import keras in IPython or Jupyter Notebook to locate the directory.

Then, just change the value of the "backend" key in the keras.json file to "torch".

{
    "floatx": "float32",
    "epsilon": 1e-07,
    "backend": "torch",
    "image_data_format": "channels_last"
}

If you are in a production system or using cloud environments like Colab, you might not be able to modify the configuration file. In such cases, you can resolve this by setting an environment variable:

os.environ["KERAS_BACKEND"] = "torch"

Once the Keras backend is configured, you can confirm it with the following code:

In:  import keras
     keras.config.backend()

Out: 'torch'

After the preparations are done, we’ll officially start our project practice for today.


Project in Action: An End-to-End Example

The fastest way to learn a framework is through real project practice. So now it’s time to fulfill my promise.

I will guide you through using the subclassing API step by step to implement a neural machine translation (NMT) model and explain some details of using Keras 3.0.

Theory introduction

If you’re not familiar with the NMT model, here’s a brief introduction:

NMT is a type of recurrent neural network model based on an encoder-decoder architecture.

In this architecture, there is an embedding layer and an RNN (we use LSTM in this article) layer forming an encoder, and another embedding layer and RNN layer forming a decoder.

The original text, after being vectorized, is input into the encoder module. After a series of steps, the final state is input into the decoder module.

Additionally, the target text is also input into the decoder module, but before entering the decoder, it is offset by one step forward. Thus, the beginning part of the target text starts with a start-of-sequence (SOS) placeholder.

The encoder’s input state and the target text’s input are processed in the decoder through a series of recurrent calculations, and finally output to a Dense layer, where they are activated to calculate the probabilities of each text vector, compared with the target text’s word vectors, and calculate the loss.

Therefore, we also add an end-of-sequence (EOS) placeholder at the end of the target text to mark the text’s end.

The entire architecture is shown in the following diagram:

Of course, due to the popularity of the Transformer architecture, Keras’ KerasNLP package also offers various pre-trained models like Bert and GPT for completing NLP tasks.

However, this article focuses on understanding how to use Keras 3.0, so using a basic RNN network will be enough.

Modules and flowchart

Since this is a production-ready project, we build modules based on Keras 3.0’s subclassing API.

For a clear understanding of each module and their interactions, I’ve created the flowchart below:

The modules and flowchart of this project. Image by Author
The modules and flowchart of this project. Image by Author

We’ll write our code according to the design on the flowchart.

Importing packages

In a Jupyter Notebook environment, I like to import all related packages at the start of the project.

This way, if I find something missing in the middle, I just need to add it in one place, instead of searching for the import cell:

from pathlib import Path
import pickle

import keras
from keras import layers, utils
import numpy as np

utils.set_random_seed(42)

Here’s a little tip: the utils.set_random_seed method can set the random seeds for Python, Numpy, and Pytorch all in one line of code, which is super convenient.

Data preparation

Before we start, we need to choose suitable data. Like past encoder-decoder models, we also chose the spa-eng text dataset.

This dataset is provided by contributors to the Tatoeba project and contains 120,000 sentence pairs. It is released under the Creative Commons Attribution 2.0 France license, and you can download the dataset from here.

After downloading, let’s first check the contents of the spa.txt file:

The rain lasted three days. La lluvia duró tres días. CC-BY 2.0 (France) Attribution: tatoeba.org #27004 (CK) &amp; #431740 (Leono)
The refrigerator is closed. El frigorífico está cerrado. CC-BY 2.0 (France) Attribution: tatoeba.org #5152850 (CarpeLanam) &amp; #10211587 (manufrutos)
The reports were confusing. Los informes eran confusos. CC-BY 2.0 (France) Attribution: tatoeba.org #2268485 (_undertoad) &amp; #2268486 (cueyayotl)
The resemblance is uncanny. La similitud es extraña. CC-BY 2.0 (France) Attribution: tatoeba.org #2691302 (CM) &amp; #5941808 (albrusgher)
The resemblance is uncanny. El parecido es asombroso. CC-BY 2.0 (France) Attribution: tatoeba.org #2691302 (CM) &amp; #6026125 (albrusgher)
The results seem promising. Los resultados se antojan prometedores. CC-BY 2.0 (France) Attribution: tatoeba.org #8480484 (shekitten) &amp; #8464272 (arh)
The rich have many friends. Los ricos tienen muchos amigos. CC-BY 2.0 (France) Attribution: tatoeba.org #1579047 (sam_m) &amp; #1457378 (marcelostockle)

As you can see, the content includes at least three columns, with the first column being the original text and the second column being the target text, separated by tabs.

Since the file isn’t large, we can directly use numpy’s genfromtxt method to read this dataset.

text_file = Path("./temp/eng-spanish/spa-eng/spa.txt")

pairs = np.genfromtxt(text_file, delimiter="t", dtype=str,
                     usecols=(0, 1), encoding="utf-8",
                     autostrip=True,
                     converters={1: lambda x: x.replace("¡", "").replace("¿", "")})
np.random.shuffle(pairs)
sentence_en, sentence_es = pairs[:, 0], pairs[:, 1]

Next, let’s check the processing results:

In:   print(f"{sentence_en[0]} =&gt; {sentence_es[0]}")

Out:  I'm really sorry. =&gt; Realmente lo siento.

OK, no problems.

Data preprocessing

Next, we need to preprocess the text content to convert it into word vector data.

First, we define some constants:

class Configure:
    VOCAB_SIZE: int = 1000
    MAX_LENGTH: int = 50
    SOS: str = 'startofseq'
    EOS: str = 'endofseq'

Then, we start our data processing pipeline.

Note that in Keras 3.0, although you have chosen Pytorch as the backend, the TextVectorization Layer is still implemented based on TensorFlow.

Therefore, you cannot use TextVectorization as a layer in the Keras Model but must use it separately in the preprocessing pipeline.

This leads to a problem: when we migrate the trained model to the production system for inference tasks, without the TextVectorization vocabulary, we cannot perform vectorization.

So, we need to persist the vocabulary and reuse it, but there are some issues with the persistence of Keras 3.0’s TextVectorization, which I will discuss later.

I will use a TextPreprocessor module to perform the vectorization. Here is the specific code:

class TextPreprocessor:
    def __init__(self, 
                 en_config = None, es_config = None):
        if en_config is None:
            self.text_vec_layer_en = layers.TextVectorization(
                Configure.VOCAB_SIZE, output_sequence_length=Configure.MAX_LENGTH
            )
        else:
            self.text_vec_layer_en = layers.TextVectorization.from_config(en_config)

        if es_config is None:
            self.text_vec_layer_es = layers.TextVectorization(
                Configure.VOCAB_SIZE, output_sequence_length=Configure.MAX_LENGTH
            )
        else:
            self.text_vec_layer_es= layers.TextVectorization.from_config(es_config)

        self.adapted = False
        self.sos = Configure.SOS
        self.eos = Configure.EOS

    def adapt(self, en_sentences: list[str], es_sentences: list[str]) -&gt; None:
        self.text_vec_layer_en.adapt(en_sentences)
        self.text_vec_layer_es.adapt([f"{self.sos} {s} {self.eos}" for s in es_sentences])
        self.adapted = True

    def en_vocabulary(self):
        return self.text_vec_layer_en.get_vocabulary()

    def es_vocabulary(self):
        return self.text_vec_layer_es.get_vocabulary()

    def vectorize_en(self, en_sentences: list[str]):
        return self.text_vec_layer_en(en_sentences)

    def vectorize_es(self, es_sentences: list[str]):
        return self.text_vec_layer_es(es_sentences)

    @classmethod
    def from_config(cls, config):
        return cls(**config)

    def get_config(self):
        en_config = self.text_vec_layer_en.get_config()
        en_config['vocabulary'] = self.en_vocabulary()
        es_config = self.text_vec_layer_es.get_config()
        es_config['vocabulary'] = self.es_vocabulary()
        return {'en_config': en_config,
                'es_config': es_config}

    def save(self, filepath: str):
        if not self.adapted:
            raise RuntimeError("Layer hasn't been adapted yet.")
        if filepath is None:
            raise ValueError("A file path needs to be defined.")
        if not filepath.endswith('.pkl'):
            raise ValueError("The file path needs to end in .pkl.")
        pickle.dump({
            'config': self.get_config()
        }, open(filepath, 'wb'))

    @classmethod    
    def load(cls, filepath: str):
        conf = pickle.load(open(filepath, 'rb'))
        instance = cls(**conf['config'])
        return instance

Let me explain what this module does:

  • Since we need to vectorize both the original text and the target text, this module includes two TextVectorization Layers.
  • After adapting, this module will hold the vocabularies for both the original and target texts. This way, when deploying to the production system, the TextVectorization won’t need to adapt again.
  • The module uses the pickle module to enable persistence. You can use the get_config method to get the configuration of the two TextVectorization Layers and save it. You can also use from_config to initialize the module’s instance from the saved configuration directly.
  • However, when I used the get_config method, the vocabulary wasn’t retrieved (currently, I’m using Keras version 3.3, and I’m not sure if this is a bug), so I had to use the get_vocabulary method to get the vocabulary separately.

Let’s adapt the text and save the vocabulary:

text_preprocessor = TextPreprocessor()
text_preprocessor.adapt(sentence_en, sentence_es)
text_preprocessor.save('./data/text_preprocessor.pkl')

Check the vocabularies for both languages:

In:   text_preprocessor.en_vocabulary()[:10]
Out:  ['', '[UNK]', 'i', 'the', 'to', 'you', 'tom', 'a', 'is', 'he']

In:   text_preprocessor.es_vocabulary()[:10]
Out:  ['', '[UNK]', 'startofseq', 'endofseq', 'de', 'que', 'no', 'tom', 'a', 'la']

No problem at all.

Once the TextPreprocessor module is ready, we can start splitting the training and validation sets and begin the vectorization work. Since the target text also serves as input for the decoder module, we have two additional feature sets: X_train_dec and X_valid_dec:

X_train = text_preprocessor.vectorize_en(sentence_en[:100_000])
X_valid = text_preprocessor.vectorize_en(sentence_en[100_000:])

X_train_dec = text_preprocessor.vectorize_es([f"{Configure.SOS} {s}" for s in sentence_es[:100_000]])
X_valid_dec = text_preprocessor.vectorize_es([f"{Configure.SOS} {s}" for s in sentence_es[100_000:]])

y_train = text_preprocessor.vectorize_es([f"{s} {Configure.EOS}" for s in sentence_es[:100_000]])
y_valid = text_preprocessor.vectorize_es([f"{s} {Configure.EOS}" for s in sentence_es[100_000:]])

Implementing the encoder-decoder model

As depicted in the architecture diagram earlier, the entire model is divided into encoder and decoder parts. So, we implement two custom subclasses based on keras.layers.Layer for each part.

It’s important to implement the __init__, call, and get_config methods for each custom Layer.

  • The __init__ method initializes the Layer’s member variables, weights, and sub-layers.
  • The call method works similarly to Keras’s Functional API, accepting inputs as parameters and returning the Layer’s output after processing.
  • The get_config method is used to retrieve the configuration of the Layer when saving the model.

Encoder Layer:

@keras.saving.register_keras_serializable()
class Encoder(keras.layers.Layer):
    def __init__(self, embed_size: int = 128, **kwargs):
        super().__init__(**kwargs)
        self.embed_size = embed_size

        self.encoder_embedding_layer = layers.Embedding(input_dim=Configure.VOCAB_SIZE, 
                                                        output_dim=self.embed_size,
                                                        mask_zero=True)
        self.encoder = layers.LSTM(512, return_state=True)

    def call(self, inputs):
        encoder_embeddings = self.encoder_embedding_layer(inputs)
        encoder_outputs, *encoder_state = self.encoder(encoder_embeddings)
        return encoder_outputs, encoder_state

    def get_config(self):
        config = {"embed_size": self.embed_size}
        base_config = super().get_config()
        return config | base_config

In the Encoder, we set the return_state parameter of LSTM to True. This allows the final state of the LSTM to be returned as output for the Decoder Layer to use.

Decoder Layer:

@keras.saving.register_keras_serializable()
class Decoder(keras.layers.Layer):
    def __init__(self, embed_size: int = 128, **kwargs):
        super().__init__(**kwargs)
        self.embed_size = embed_size

        self.decoder_embedding_layer = layers.Embedding(input_dim=Configure.VOCAB_SIZE,
                                                        output_dim=self.embed_size,
                                                        mask_zero=True)
        self.decoder = layers.LSTM(512, return_sequences=True)

    def call(self, inputs, initial_state=None):
        decoder_embeddings = self.decoder_embedding_layer(inputs)
        decoder_outputs = self.decoder(decoder_embeddings,
                                       initial_state=initial_state)
        return decoder_outputs

    def get_config(self):
        config = {"embed_size": self.embed_size}
        base_config = super().get_config()
        return config | base_config

In the Decoder, besides receiving data input, the call method also accepts the Encoder’s input through the initial_state function and returns the module’s output.

We also implement a custom Model, which needs to implement the __init__, call, and get_config methods, similar to keras.layers.Layer.

@keras.saving.register_keras_serializable()
class NMTModel(keras.models.Model):
    embed_size: int = 128

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.encoder = Encoder(self.embed_size)
        self.decoder = Decoder(self.embed_size)

        self.out = layers.Dense(Configure.VOCAB_SIZE, activation='softmax')

    def call(self, inputs):
        encoder_inputs, decoder_inputs = inputs

        encoder_outputs, encoder_state = self.encoder(encoder_inputs)
        decoder_outputs = self.decoder(decoder_inputs, initial_state=encoder_state)
        out_proba = self.out(decoder_outputs)
        return out_proba

    def get_config(self):
        base_config = super().get_config()
        return base_config
  • In the Model, we initialize a Dense layer to convert the Decoder’s output into results for the word vectors.
  • The call method takes two inputs, which can be easily distinguished through unpacking.
  • Both Layer and Model need to have the @keras.saving.register_keras_serializable() decorator to ensure correct serialization when saving the model.

Model training

After defining the model, we proceed to the training phase:

nmt_model = NMTModel()
nmt_model.compile(loss='sparse_categorical_crossentropy',
                  optimizer='nadam',
                  metrics=['accuracy'])
checkpoint = keras.callbacks.ModelCheckpoint(
    './data/nmt_model.keras',
    monitor='val_accuracy',
    save_best_only=True
)
nmt_model.fit((X_train, X_train_dec), y_train, epochs=1,
              validation_data=((X_valid, X_valid_dec), y_valid),
              batch_size=128,
              callbacks=[checkpoint])

In this part of the code:

  • We first call the compile method to compile the model instance, defining components such as loss, optimizer, and metrics.
  • We set up a ModelCheckpoint callback to save the model with the best val_accuracy after training.
  • We use the fit method, passing X_train and X_train_dec as a tuple to the x parameter, and handle validation_data similarly.
  • This is just a demo, so I set epochs to 1. You can adjust the values of epochs and batch_size as needed.
  • Keras 3.0 also supports Pytorch’s DataLoader, or you can implement a backend-agnostic preprocessing pipeline based on [keras.utils.PyDataset](https://keras.io/api/utils/python_utils/?ref=dataleadsfuture.com#pydataset-class). I can explain how to use these in my next article.

After training is complete, the model should be saved.

Inference task

After training, you can deploy the corresponding code modules, along with the saved vocabulary and model, to the production system for inference tasks.

Since the model’s Dense layer outputs the probability of each word vector in the vocabulary, you need to merge each inferred word with the previous results and re-input them with the original text to predict the next word:

preprocessor = TextPreprocessor.load('./data/text_preprocessor.pkl')
nmt_model = keras.saving.load_model('./data/nmt_model.keras')

def translate(sentence_en):
    translation = ""
    for word_index in range(50):
        X = preprocessor.vectorize_en([sentence_en])
        X_dec = preprocessor.vectorize_es([Configure.SOS + " " + translation])
        y_proba = nmt_model.predict((X, X_dec), verbose=0)[0, word_index]
        predicted_word_id = np.argmax(y_proba)
        predicted_word = preprocessor.es_vocabulary()[predicted_word_id]
        if predicted_word == Configure.EOS:
            break
        translation = translation + " " + predicted_word
    return translation.strip()

Let’s write a simple method to test the results:

In:   translate("It was pretty cool.")
Out:  'era bastante [UNK]'

Although it’s not very accurate, the goal of this article is to learn how to use the Keras 3.0 subclassing API, so you still have plenty of room to optimize this model, right?


Conclusion

The release of Keras 3.0 allows us to implement models efficiently using Keras’s concise API while using Pytorch or Jax as backends.

However, since the version was released recently, the accompanying documentation is not yet complete, so you might encounter some difficulties in trying new versions.

This article through an end-to-end practical example, explains the environment setup and basic development process of Keras 3.0, helping you get started quickly.

Unfortunately, the Keras 3.0 project is still in its early stages and cannot completely break away from the dependence on TensorFlow, as well as some of TensorFlow’s inexplicable issues.

But I am still optimistic about this version. I believe that as time goes on and support for multiple backends improves, Keras will be revitalized, helping to make Deep Learning technology more accessible and reducing the learning curve for deep learning.

What else would you like to know about Keras 3.0? Feel free to leave a comment and discuss.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.


Good news! Scikit-learn now offers Display classes that let us use methods like from_estimator and from_predictions to make drawing graphs for different situations much easier:

Scikit-learn Visualization Guide: Making Models Speak

The post Keras 3.0 Tutorial: End-to-End Deep Learning Project Guide appeared first on Towards Data Science.

]]>
Scikit-learn Visualization Guide: Making Models Speak https://towardsdatascience.com/scikit-learn-visualization-guide-making-models-speak-6ee32a3c5c04/ Thu, 21 Mar 2024 14:06:22 +0000 https://towardsdatascience.com/scikit-learn-visualization-guide-making-models-speak-6ee32a3c5c04/ Use the Display API to replace complex Matplotlib code

The post Scikit-learn Visualization Guide: Making Models Speak appeared first on Towards Data Science.

]]>
Introduction

In the journey of Machine Learning, explaining models with visualization is as important as training them.

A good chart can show us what a model is doing in an easy-to-understand way. Here’s an example:

Decision boundaries of two different generalization performances. Image by Author
Decision boundaries of two different generalization performances. Image by Author

This graph makes it clear that for the same dataset, the model on the right is better at generalizing.

Most machine learning books prefer to use raw Matplotlib code for visualization, which leads to issues:

  1. You have to learn a lot about drawing with Matplotlib.
  2. Plotting code fills up your notebook, making it hard to read.
  3. Sometimes you need third-party libraries, which isn’t ideal in business settings.

Good news! Scikit-learn now offers Display classes that let us use methods like from_estimator and from_predictions to make drawing graphs for different situations much easier.

Curious? Let me show you these cool APIs.


Scikit-learn Display API Introduction

Use utils.discovery.all_displays to find available APIs

Scikit-learn (sklearn) always adds Display APIs in new releases, so it’s key to know what’s available in your version.

Sklearn’s [utils.discovery.all_displays](https://scikit-learn.org/stable/modules/generated/sklearn.utils.discovery.all_displays.html?ref=dataleadsfuture.com#sklearn.utils.discovery.all_displays) lets you see which classes you can use.

Python">from sklearn.utils.discovery import all_displays

displays = all_displays()
displays

For example, in my Scikit-learn 1.4.0, these classes are available:

[('CalibrationDisplay', sklearn.calibration.CalibrationDisplay),
 ('ConfusionMatrixDisplay',
  sklearn.metrics._plot.confusion_matrix.ConfusionMatrixDisplay),
 ('DecisionBoundaryDisplay',
  sklearn.inspection._plot.decision_boundary.DecisionBoundaryDisplay),
 ('DetCurveDisplay', sklearn.metrics._plot.det_curve.DetCurveDisplay),
 ('LearningCurveDisplay', sklearn.model_selection._plot.LearningCurveDisplay),
 ('PartialDependenceDisplay',
  sklearn.inspection._plot.partial_dependence.PartialDependenceDisplay),
 ('PrecisionRecallDisplay',
  sklearn.metrics._plot.precision_recall_curve.PrecisionRecallDisplay),
 ('PredictionErrorDisplay',
  sklearn.metrics._plot.regression.PredictionErrorDisplay),
 ('RocCurveDisplay', sklearn.metrics._plot.roc_curve.RocCurveDisplay),
 ('ValidationCurveDisplay',
  sklearn.model_selection._plot.ValidationCurveDisplay)]

Using inspection.DecisionBoundaryDisplay for decision boundaries

Since we mentioned it, let’s start with decision boundaries.

If you use Matplotlib to draw them, it’s a hassle:

  • Use np.linspace to set coordinate ranges;
  • Use plt.meshgrid to calculate the grid;
  • Use plt.contourf to draw the decision boundary fill;
  • Then use plt.scatter to plot data points.

Now, with [inspection.DecisionBoundaryDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.inspection.DecisionBoundaryDisplay.html?ref=dataleadsfuture.com#sklearn-inspection-decisionboundarydisplay), you can simplify this process:

from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.datasets import load_iris
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

iris = load_iris(as_frame=True)
X = iris.data[['petal length (cm)', 'petal width (cm)']]
y = iris.target

svc_clf = make_pipeline(StandardScaler(), 
                        SVC(kernel='linear', C=1))
svc_clf.fit(X, y)

display = DecisionBoundaryDisplay.from_estimator(svc_clf, X, 
                                                 grid_resolution=1000,
                                                 xlabel="Petal length (cm)",
                                                 ylabel="Petal width (cm)")
plt.scatter(X.iloc[:, 0], X.iloc[:, 1], c=y, edgecolors='w')
plt.title("Decision Boundary")
plt.show()

See the final effect in the figure:

Use DecisionBoundaryDisplay to draw a triple classification model. Image by Author
Use DecisionBoundaryDisplay to draw a triple classification model. Image by Author

Remember, Display can only draw 2D, so make sure your data has only two features or reduced dimensions.

Using calibration.CalibrationDisplay for probability calibration

To compare classification models, probability calibration curves show how confident models are in their predictions.

Note that [CalibrationDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.calibration.CalibrationDisplay.html?ref=dataleadsfuture.com#sklearn.calibration.CalibrationDisplay) uses the model’s predict_proba. If you use a support vector machine, set probability to True:

from sklearn.calibration import CalibrationDisplay
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.ensemble import HistGradientBoostingClassifier

X, y = make_classification(n_samples=1000,
                           n_classes=2, n_features=5,
                           random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                            test_size=0.3, random_state=42)
proba_clf = make_pipeline(StandardScaler(), 
                          SVC(kernel="rbf", gamma="auto", 
                              C=10, probability=True))
proba_clf.fit(X_train, y_train)

CalibrationDisplay.from_estimator(proba_clf, 
                                            X_test, y_test)

hist_clf = HistGradientBoostingClassifier()
hist_clf.fit(X_train, y_train)

ax = plt.gca()
CalibrationDisplay.from_estimator(hist_clf,
                                  X_test, y_test,
                                  ax=ax)
plt.show()
Charts drawn by CalibrationDisplay. Image by Author
Charts drawn by CalibrationDisplay. Image by Author

Using metrics.ConfusionMatrixDisplay for confusion matrices

When assessing classification models and dealing with imbalanced data, we look at precision and recall.

These break down into TP, FP, TN, and FN – a confusion matrix.

To draw one, use [metrics.ConfusionMatrixDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.ConfusionMatrixDisplay.html?ref=dataleadsfuture.com#sklearn-metrics-confusionmatrixdisplay). It’s well-known, so I’ll skip the details.

from sklearn.datasets import fetch_openml
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import ConfusionMatrixDisplay

digits = fetch_openml('mnist_784', version=1)
X, y = digits.data, digits.target
rf_clf = RandomForestClassifier(max_depth=5, random_state=42)
rf_clf.fit(X, y)

ConfusionMatrixDisplay.from_estimator(rf_clf, X, y)
plt.show()
Charts drawn with ConfusionMatrixDisplay. Image by Author
Charts drawn with ConfusionMatrixDisplay. Image by Author

metrics.RocCurveDisplay and metrics.DetCurveDisplay

These two are together because they’re often used to evaluate side by side.

[RocCurveDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.RocCurveDisplay.html?ref=dataleadsfuture.com#sklearn.metrics.RocCurveDisplay) compares TPR and FPR for the model.

For binary classification, you want low FPR and high TPR, so the upper left corner is best. The Roc curve bends towards this corner.

Because the Roc curve stays near the upper left, leaving the lower right empty, it’s hard to see model differences.

So, we also use [DetCurveDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.DetCurveDisplay.html?ref=dataleadsfuture.com#sklearn.metrics.DetCurveDisplay) to draw a Det curve with FNR and FPR. It uses more space, making it clearer than the Roc curve.

The perfect point for a Det curve is the lower left corner.

from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import DetCurveDisplay

X, y = make_classification(n_samples=10_000, n_features=5,
                           n_classes=2, n_informative=2)
X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                             test_size=0.3, random_state=42,
                                                   stratify=y)

classifiers = {
    "SVC": make_pipeline(StandardScaler(), 
                        SVC(kernel="linear", C=0.1, random_state=42)),
    "Random Forest": RandomForestClassifier(max_depth=5, random_state=42)
}

fig, [ax_roc, ax_det] = plt.subplots(1, 2, figsize=(10, 4))
for name, clf in classifiers.items():
    clf.fit(X_train, y_train)

    RocCurveDisplay.from_estimator(clf, X_test, y_test, ax=ax_roc, name=name)
    DetCurveDisplay.from_estimator(clf, X_test, y_test, ax=ax_det, name=name)
Comparison Chart of RocCurveDisplay and DetCurveDisplay. Image by Author
Comparison Chart of RocCurveDisplay and DetCurveDisplay. Image by Author

Using metrics.PrecisionRecallDisplay to adjust thresholds

With imbalanced data, you might want to shift recall and precision.

  • For email fraud, you want high precision.
  • For disease screening, you want high recall to catch more cases.

You can adjust the threshold, but what’s the right amount?

Here, [metrics.PrecisionRecallDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.PrecisionRecallDisplay.html?ref=dataleadsfuture.com#sklearn-metrics-precisionrecalldisplay) can help.

from xgboost import XGBClassifier
from sklearn.datasets import load_wine
from sklearn.metrics import PrecisionRecallDisplay

wine = load_wine()
X, y = wine.data[wine.target&lt;=1], wine.target[wine.target&lt;=1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,
                                               stratify=y, random_state=42)

xgb_clf = XGBClassifier()
xgb_clf.fit(X_train, y_train)

PrecisionRecallDisplay.from_estimator(xgb_clf, X_test, y_test)
plt.show()
Charting xgboost model evaluation using PrecisionRecallDisplay. Image by Author
Charting xgboost model evaluation using PrecisionRecallDisplay. Image by Author

This shows that models following Scikit-learn’s design can be drawn, like xgboost here. Handy, right?

Using metrics.PredictionErrorDisplay for regression models

We’ve talked about classification, now let’s talk about regression.

Scikit-learn’s [metrics.PredictionErrorDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.PredictionErrorDisplay.html?ref=dataleadsfuture.com#sklearn-metrics-predictionerrordisplay) helps assess regression models.

from sklearn.svm import SVR
from sklearn.metrics import PredictionErrorDisplay

rng = np.random.default_rng(42)
X = rng.random(size=(200, 2)) * 10
y = X[:, 0]**2 + 5 * X[:, 1] + 10 + rng.normal(loc=0.0, scale=0.1, size=(200,))

reg = make_pipeline(StandardScaler(), SVR(kernel='linear', C=10))
reg.fit(X, y)

fig, axes = plt.subplots(1, 2, figsize=(8, 4))
PredictionErrorDisplay.from_estimator(reg, X, y, ax=axes[0], kind="actual_vs_predicted")
PredictionErrorDisplay.from_estimator(reg, X, y, ax=axes[1], kind="residual_vs_predicted")
plt.show()
Two charts were drawn by PredictionErrorDisplay. Image by Author
Two charts were drawn by PredictionErrorDisplay. Image by Author

As shown, it can draw two kinds of graphs. The left shows predicted vs. actual values – good for linear regression.

However, not all data is perfectly linear. For that, use the right graph.

It compares real vs. predicted differences, a residuals plot.

This plot’s banana shape suggests our data might not fit linear regression.

Switching from a linear to an rbf kernel can help.

reg = make_pipeline(StandardScaler(), SVR(kernel='rbf', C=10))
A visual demonstration of the improved model performance. Image by Author
A visual demonstration of the improved model performance. Image by Author

See, with rbf, the residual plot looks better.

Using model_selection.LearningCurveDisplay for learning curves

After assessing performance, let’s look at optimization with [LearningCurveDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.LearningCurveDisplay.html?ref=dataleadsfuture.com#sklearn.model_selection.LearningCurveDisplay).

First up, learning curves – how well the model generalizes with different training and testing data, and if it suffers from variance or bias.

As shown below, we compare a DecisionTreeClassifier and a GradientBoostingClassifier to see how they do as training data changes.

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import LearningCurveDisplay

X, y = make_classification(n_samples=1000, n_classes=2, n_features=10,
                           n_informative=2, n_redundant=0, n_repeated=0)

tree_clf = DecisionTreeClassifier(max_depth=3, random_state=42)
gb_clf = GradientBoostingClassifier(n_estimators=50, max_depth=3, tol=1e-3)

train_sizes = np.linspace(0.4, 1.0, 10)
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
LearningCurveDisplay.from_estimator(tree_clf, X, y,
                                    train_sizes=train_sizes,
                                    ax=axes[0],
                                    scoring='accuracy')
axes[0].set_title('DecisionTreeClassifier')
LearningCurveDisplay.from_estimator(gb_clf, X, y,
                                    train_sizes=train_sizes,
                                    ax=axes[1],
                                    scoring='accuracy')
axes[1].set_title('GradientBoostingClassifier')
plt.show()
Comparison of the learning curve of two different models. Image by Author
Comparison of the learning curve of two different models. Image by Author

The graph shows that although the tree-based GradientBoostingClassifier maintains good accuracy on the training data, its generalization capability on test data does not have a significant advantage over the DecisionTreeClassifier.

Using model_selection.ValidationCurveDisplay for visualizing parameter tuning

So, for models that don’t generalize well, you might try adjusting the model’s regularization parameters to tweak its performance.

The traditional approach is to use tools like GridSearchCV or Optuna to tune the model, but these methods only give you the overall best-performing model and the tuning process is not very intuitive.

For scenarios where you want to adjust a specific parameter to test its effect on the model, I recommend using [model_selection.ValidationCurveDisplay](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.ValidationCurveDisplay.html?ref=dataleadsfuture.com#sklearn.model_selection.ValidationCurveDisplay) to visualize how the model performs as the parameter changes.

from sklearn.model_selection import ValidationCurveDisplay
from sklearn.linear_model import LogisticRegression

param_name, param_range = "C", np.logspace(-8, 3, 10)
lr_clf = LogisticRegression()

ValidationCurveDisplay.from_estimator(lr_clf, X, y,
                                      param_name=param_name,
                                      param_range=param_range,
                                      scoring='f1_weighted',
                                      cv=5, n_jobs=-1)
plt.show()
Fine-tuning of model parameters plotted with ValidationCurveDisplay. Image by Author
Fine-tuning of model parameters plotted with ValidationCurveDisplay. Image by Author

Some regrets

After trying out all these Displays, I must admit some regrets:

  • The biggest one is that most of these APIs lack detailed tutorials, which is probably why they’re not well-known compared to Scikit-learn’s thorough documentation.
  • These APIs are scattered across various packages, making it hard to reference them from a single place.
  • The code is still pretty basic. You often need to pair it with Matplotlib’s APIs to get the job done. A typical example is DecisionBoundaryDisplay, where after plotting the decision boundary, you still need Matplotlib to plot the data distribution.
  • They’re hard to extend. Besides a few methods validating parameters, it’s tough to simplify my model Visualization process with tools or methods; I end up rewriting a lot.

I hope these APIs get more attention, and as versions upgrade, visualization APIs become even easier to use.


Conclusion

In the journey of machine learning, explaining models with visualization is as important as training them.

This article introduced various plotting APIs in the current version of scikit-learn.

With these APIs, you can simplify some Matplotlib code, ease your learning curve, and streamline your model evaluation process.

Due to length, I didn’t expand on each API. If interested, you can check the official documentation for more details.

Now it’s your turn. What are your expectations for visualizing machine learning methods? Feel free to leave a comment and discuss.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.

The post Scikit-learn Visualization Guide: Making Models Speak appeared first on Towards Data Science.

]]>
Visualizing What Batch Normalization Is and Its Advantages https://towardsdatascience.com/visualizing-what-batch-normalization-is-and-its-advantages-a49bbcd2fd86/ Sat, 03 Feb 2024 05:41:10 +0000 https://towardsdatascience.com/visualizing-what-batch-normalization-is-and-its-advantages-a49bbcd2fd86/ Optimizing your neural network training with batch normalization

The post Visualizing What Batch Normalization Is and Its Advantages appeared first on Towards Data Science.

]]>
Visualizing what batch normalization is and its advantages. Image by Author
Visualizing what batch normalization is and its advantages. Image by Author

Introduction

Have you, when conducting Deep Learning projects, ever encountered a situation where the more layers your neural network has, the slower the training becomes?

If your answer is YES, then congratulations, it’s time for you to consider using batch normalization now.


What is Batch Normalization?

As the name suggests, batch normalization is a technique where batched training data, after activation in the current layer and before moving to the next layer, is standardized. Here’s how it works:

  1. _The entire dataset is randomly divided into N batches without replacement, each with a minibatch size, for the training.
  2. For the i-th batch, standardize the data distribution within the batch using the formula: (Xi - Xmean) / Xstd.
  3. Scale and shift the standardized data with γXi + β to allow the neural network to undo the effects of standardization if needed.

The steps seem simple, don’t they? So, what are the advantages of batch normalization?


Advantages of Batch Normalization

Speeds up model convergence

Neural Networks commonly adjust parameters using gradient descent. If the cost function is smooth and has only one lowest point, the parameters will converge quickly along the gradient.

But if there’s a significant variance in the data distribution across nodes, the cost function becomes less like a pit bottom and more like a valley, making the convergence of the gradient exceptionally slow.

Confused? No worries, let’s explain this situation with a visual:

First, prepare a virtual dataset with only two features, where the distribution of features is vastly different, along with a target function:

rng = np.random.default_rng(42)

A = rng.uniform(1, 10, 100)
B = rng.uniform(1, 200, 100)

y = 2*A + 3*B + rng.normal(size=100) * 0.1  # with a little bias

Then, with the help of GPT, we use matplot3d to visualize the gradient descent situation before data standardization:

Visualization of cost functions without standardization of data. Image by Author
Visualization of cost functions without standardization of data. Image by Author

Notice anything? Because one feature’s span is too large, the function’s gradient is stretched long in the direction of this feature, creating a valley.

Now, for the gradient to reach the bottom of the cost function, it has to go through many more iterations.

But what if we standardize the two features first?

def normalize(X):
    mean = np.mean(X)
    std = np.std(X)
    return (X - mean)/std

A = normalize(A)
B = normalize(B)

Let’s look at the cost function after data standardization:

Visualization of standardized cost functions for data. Image by Author
Visualization of standardized cost functions for data. Image by Author

Clearly, the function turns into the shape of a bowl. The gradient simply needs to descend along the slope to reach the bottom. Isn’t that much faster?

Slows down the problem of gradient vanishing

The graph we just used has already demonstrated this advantage, but let’s take a closer look.

Remember this function?

Visualization of sigmoid function. Image by Author
Visualization of sigmoid function. Image by Author

Yes, that’s the sigmoid function, which many neural networks use as an activation function.

Looking closely at the sigmoid function, we find that the slope is steepest between -2 and 2.

The slope of the sigmoid function is steepest between -2 and 2. Image by Author
The slope of the sigmoid function is steepest between -2 and 2. Image by Author

If we reduce the standardized data to a straight line, we’ll find that these data are distributed exactly within the steepest slope of the sigmoid. At this point, we can consider the gradient to be descending the fastest.

The normalized data will be distributed in the steepest interval of the sigmoid function. Image by Author
The normalized data will be distributed in the steepest interval of the sigmoid function. Image by Author

However, as the network goes deeper, the activated data will drift layer by layer (Internal Covariate Shift), and a large amount of data will be distributed away from the zero point, where the slope gradually flattens.

The distribution of data is progressively shifted within the neural network. Image by Author
The distribution of data is progressively shifted within the neural network. Image by Author

At this point, the gradient descent becomes slower and slower, which is why with more neural network layers, the convergence becomes slower.

If we standardize the data of the mini_batch again after each layer’s activation, the data for the current layer will return to the steeper slope area, and the problem of gradient vanishing can be greatly alleviated.

The renormalized data return to the region with the steepest slope. Image by Author
The renormalized data return to the region with the steepest slope. Image by Author

Has a regularizing effect

If we don’t batch the training and standardize the entire dataset directly, the data distribution would look like the following:

Distribution after normalizing the entire data set. Image by Author
Distribution after normalizing the entire data set. Image by Author

However since we divide the data into several batches and standardize the data according to the distribution within each batch, the data distribution will be slightly different.

Distribution of data sets after normalization by batch. Image by Author
Distribution of data sets after normalization by batch. Image by Author

You can see that the data distribution has some minor noise, similar to the noise introduced by Dropout, thus providing a certain level of regularization for the neural network.


Conclusion

Batch normalization is a technique that standardizes the data from different batches to accelerate the training of neural networks. It has the following advantages:

  • Speeds up model convergence.
  • Slows down the problem of gradient vanishing.
  • Has a regularizing effect.

Have you learned something new?

Now it’s your turn. What other techniques do you know that optimize neural network performance? Feel free to leave a comment and discuss.


Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome – let’s discuss in the comments below!

This article was originally published on Data Leads Future.

The post Visualizing What Batch Normalization Is and Its Advantages appeared first on Towards Data Science.

]]>
Ensuring Correct Use of Transformers in Scikit-learn Pipeline https://towardsdatascience.com/ensuring-correct-use-of-transformers-in-scikit-learn-pipelines-393566db7bfa/ Wed, 20 Dec 2023 07:08:42 +0000 https://towardsdatascience.com/ensuring-correct-use-of-transformers-in-scikit-learn-pipelines-393566db7bfa/ This article will explain how to use Pipeline and Transformers correctly in Scikit-Learn (sklearn) projects to speed up and reuse our model training process. This piece complements and clarifies the official documentation on Pipeline examples and some common misunderstandings. I hope that after reading this, you’ll be able to use the Pipeline, an excellent design, […]

The post Ensuring Correct Use of Transformers in Scikit-learn Pipeline appeared first on Towards Data Science.

]]>

This article will explain how to use Pipeline and Transformers correctly in Scikit-Learn (sklearn) projects to speed up and reuse our model training process.

This piece complements and clarifies the official documentation on Pipeline examples and some common misunderstandings.

I hope that after reading this, you’ll be able to use the Pipeline, an excellent design, to better complete your machine learning tasks.


Introduction

There’s a famous dish in Chinese restaurants around the world called "General Tso’s Chicken," and I wonder if you’ve tried it.

One characteristic of "General Tso’s Chicken" is that each piece of chicken is processed by the chef to be the same size. This ensures that:

  1. All pieces are marinated for the same amount of time.
  2. During cooking, each piece of chicken reaches the same level of doneness.
  3. When using chopsticks, the uniform size makes it easier to pick up the pieces.

This preprocessing includes washing, cutting, and marinating the ingredients. If the chicken pieces are cut larger than usual, the flavor can change significantly even if stir-fried for the same amount of time.

So, when preparing to open a restaurant, we must consider standardizing these processes and recipes to ensure that each plate of "General Tso’s Chicken" has a consistent taste and texture. This is how restaurants thrive.

Back in the world of machine learning, Scikit-Learn also provides such standardized processes called Pipeline. They solidify the data preprocessing and model training process into a standardized workflow, making machine learning projects easier to maintain and reuse.

In this article, we’ll explore how to use Transformers correctly within Scikit-Learn’s Pipeline, ensuring that our data is as perfectly prepared as the ingredients for a fine meal.


Why Use Transformers

What are Transformers

In Scikit-Learn, Transformers mainly fall into two categories: data scaling and feature dimensionality reduction.

Take, for example, a set of housing data, which includes features like location, area, and number of bedrooms.

If you don’t standardize these features to the same scale, the model might overlook the significant impact of location (usually categorical data) due to minor fluctuations in the area (usually a larger numerical value).

It’s like overpowering the delicate taste of herbs with too much pepper.

Using Transformers correctly

Typically, data scaling is done using standardization, formulated as:

Where train_mean and train_std are variables extracted from the train data.

In Scikit-Learn, train data and test data are both obtained from the original dataset using the [train_test_split](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html?ref=dataleadsfuture.com#sklearn.model_selection.train_test_split) method.

When scaling test data, the same train_mean and train_std are used:

Here arises the question: why use train data to generate these variables?

Let’s look at a simple dataset where the train data is:

After standardization, the train data becomes:

Clearly, after scaling, features greater than 0 have a label of 1, which means features greater than 10 before scaling have a label of 1.

Now let’s look at the test data:

If we use test_mean and test_std generated from the test data distribution without considering the train data, the results become:

Obviously, this prediction result does not make sense. But suppose we use train_mean and train_std to process the data and combine it with the model prediction; let’s see what happens:

As we can see, only by preprocessing the data with variables generated through train data can we ensure that the model’s prediction meets expectations.

Using Transformers in Scikit-Learn

Using Transformers in Scikit-Learn is quite simple.

We can generate a set of simulated data using make_classification and then split it into train and test with train_test_split.

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

X, y = make_classification(n_samples=100, n_features=2, 
                           n_classes=2, n_redundant=0, 
                           n_informative=2, n_clusters_per_class=1, 
                           random_state=42)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Let’s look at the distribution of the data:

import matplotlib.pyplot as plt

plt.scatter(X_train[:, 0], X_train[:, 1], color='red', marker='o')
plt.scatter(X_test[:, 0], X_test[:, 1], color='green', marker='s')
plt.xlabel('feature_idx_0')
plt.ylabel('feature_idx_1')
plt.tight_layout()
plt.show()

Here we’re using StandardScaler to scale the features.

First, initialize the StandardScaler, then fit it with train data:

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler.fit(X_train)

Next, we can transform the train data’s features with the fitted Transformer:

X_train_std = scaler.transform(X_train)

Of course, we could also use fit_transform to fit and transform the train data in one go:

X_train_std = scaler.fit_transform(X_train)

Then we simply transform the test data without needing to fit it again:

X_test_std = scaler.transform(X_test)

After transformation, the distribution of the data remains unchanged, except for the change in scale:

Apart from scaling data with tools like [StandardScaler](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html?ref=dataleadsfuture.com#sklearn.preprocessing.StandardScaler) and [MinMaxScaler](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html?ref=dataleadsfuture.com#sklearn.preprocessing.MinMaxScaler), we can also use PCA, SelectKBest, etc., for dimensionality reduction. For the sake of brevity, I won’t delve into these here, but you’re welcome to consult the official documentation for more information.


Using Transformers in a Pipeline

Why use a Pipeline

As mentioned earlier, in a machine learning task, we often need to use various Transformers for data scaling and feature dimensionality reduction before training a model.

This presents several challenges:

  • Code complexity: For each use of a Transformer, we have to go through initialization, fit_transform, and transform steps. Missing one step during a transformation could derail the entire training process.
  • Data leakage: As we discussed, for each Transformer, we fit with train data and then transform both train and test data. We must avoid letting the distribution of the test data leak into the train data.
  • Code reusability: A machine learning model includes not only the trained Estimator for prediction but also the data preprocessing steps. Therefore, a machine learning task comprising Transformers and an Estimator should be atomic and indivisible.
  • Hyperparameter tuning: After setting up the steps of machine learning, we need to adjust hyperparameters to find the best combination of Transformer parameter values.

Scikit-Learn introduced the Pipeline module to solve these issues.

What is a Pipeline

A Pipeline is a module in Scikit-Learn that implements the chain of responsibility design pattern.

When creating a Pipeline, we use the steps parameter to chain together multiple Transformers for initialization:

from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

pipeline = Pipeline(steps=[('scaler', StandardScaler()),
                           ('pca', PCA(n_components=2, random_state=42)),
                           ('estimator', RandomForestClassifier(n_estimators=3, max_depth=5))])

The official documentation points out that the last Transformer must be an Estimator.

If you don’t need to specify each Transformer’s name, you can simplify the creation of a Pipeline with make_pipeline:

from sklearn.pipeline import make_pipeline

pipeline_2 = make_pipeline(StandardScaler(),
                           PCA(n_components=2, random_state=42),
                           RandomForestClassifier(n_estimators=3, max_depth=5))

Understanding the Pipeline’s mechanism from the source code

We’ve mentioned the importance of not letting test data variables leak into training data when using each Transformer.

This principle is relatively easy to ensure when each data preprocessing step is independent.

But what if we integrate these steps using a Pipeline?

If we look at the official documentation, we find it simply uses the fit method on the entire dataset without explaining how to handle train and test data separately.

With this question in mind, I dived into the Pipeline’s source code to find the answer.

Reading the source code revealed that although Pipeline implements fit, fit_transform, and predict methods, they work differently from regular Transformers.

Take the following Pipeline creation process as an example:

from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

pipeline = Pipeline(steps=[('scaler', StandardScaler()),
                           ('pca', PCA(n_components=2, random_state=42)),
                           ('estimator', RandomForestClassifier(n_estimators=3, max_depth=5))])

The internal implementation can be represented by the following diagram:

As you can see, when we call the fit method, Pipeline first separates Transformers from the Estimator.

For each Transformer, Pipeline checks if there’s a fit_transform method; if so, it calls it; otherwise, it calls fit.

For the Estimator, it calls fit directly.

For the predict method, Pipeline separates Transformers from the Estimator.

Pipeline calls each Transformer’s transform method in sequence, followed by the Estimator’s predict method.

Therefore, when using a Pipeline, we still need to split train and test data. Then we simply call fit on the train data and predict on the test data.

There’s a special case when combining Pipeline with GridSearchCV for hyperparameter tuning: you don’t need to manually split train and test data. I’ll explain this in more detail in the best practices section.


Best Practices for Using Transformers and Pipeline in Actual Applications

Now that we’ve discussed the working principles of Transformers and Pipeline, it’s time to fulfill the promise made in the title and talk about the best practices when combining Transformers with Pipeline in real projects.

Combining Pipeline with GridSearchCV for hyperparameter tuning

In a machine learning project, selecting the right dataset processing and algorithm is one aspect. After debugging the initial steps, it’s time for parameter optimization.

Using GridSearchCV or RandomizedSearchCV, you can try different parameters for the Estimator to find the best fit:

import time

from sklearn.model_selection import GridSearchCV

pipeline = Pipeline(steps=[('scaler', StandardScaler()),
                           ('pca', PCA()),
                           ('estimator', RandomForestClassifier())])
param_grid = {'pca__n_components': [2, 'mle'],
              'estimator__n_estimators': [3, 5, 7],
              'estimator__max_depth': [3, 5]}

start = time.perf_counter()
clf = GridSearchCV(pipeline, param_grid=param_grid, cv=5, n_jobs=4)
clf.fit(X, y)

# It takes 2.39 seconds to finish the search on my laptop.
print(f"It takes {time.perf_counter() - start} seconds to finish the search.")

But in machine learning, hyperparameter tuning is not limited to Estimator parameters; it also involves combinations of Transformer parameters.

Integrating all steps with Pipeline allows for hyperparameter tuning of every element with different parameter combinations.

Note that during hyperparameter tuning, we no longer need to manually split train and test data. GridSearchCV will split the data into training and validation sets using [StratifiedKFold](https://scikit-learn.org/stable/modules/cross_validation.html?ref=dataleadsfuture.com#stratified-k-fold), which implemented a k-fold cross validation mechanism.

We can also set the number of folds for cross-validation and choose how many workers to use. The tuning process is illustrated in the following diagram:

Due to space constraints, I won’t go into detail about GridSearchCV and RandomizedSearchCV here. If you’re interested, I can write another article explaining them next time.

Using the memory parameter to cache Transformer outputs

Of course, hyperparameter tuning with GridSearchCV can be slow, but that’s no worry, Pipeline provides a caching mechanism to speed up the tuning efficiency by caching the results of intermediate steps.

When initializing a Pipeline, you can pass in a memory parameter, which will cache the results after the first call to fit and transform for each transformer.

If subsequent calls to fit and transform use the same parameters, which is very likely during hyperparameter tuning, these steps will directly read the results from the cache instead of recalculating, significantly speeding up the efficiency when running the same Transformer repeatedly.

The memory parameter can accept the following values:

  • The default is None: caching is not used.
  • A string: providing a path to store the cached results.
  • A joblib.Memory object: allows for finer-grained control, such as configuring the storage backend for the cache.

Next, let’s use the previous GridSearchCV example, this time adding memory to the Pipeline to see how much speed can be improved:

pipeline_m = Pipeline(steps=[('scaler', StandardScaler()),
                           ('pca', PCA()),
                           ('estimator', RandomForestClassifier())],
                      memory='./cache')
start = time.perf_counter()
clf_m = GridSearchCV(pipeline_m, param_grid=param_grid, cv=5, n_jobs=4)
clf_m.fit(X, y)

# It takes 0.22 seconds to finish the search with memory parameter.
print(f"It takes {time.perf_counter() - start} seconds to finish the search with memory.")

As shown, with caching, the tuning process only takes 0.2 seconds, a significant speed increase from the previous 2.4 seconds.

How to debug Scikit-Learn Pipeline

After integrating Transformers into a Pipeline, the entire preprocessing and transformation process becomes a black box. It can be difficult to understand which step the process is currently on.

Fortunately, we can solve this problem by adding logging to the Pipeline. We need to create custom transformers to add logging at each step of data transformation.

Here’s an example of adding logging with Python’s standard logging library:

First, you need to configure a logger:

import logging

from sklearn.base import BaseEstimator, TransformerMixin

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

Next, you can create a custom Transformer and add logging within its methods:

class LoggingTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, transformer):
        self.transformer = transformer
        self.real_name = self.transformer.__class__.__name__

    def fit(self, X, y=None):
        logging.info(f"Begin fit: {self.real_name}")
        self.transformer.fit(X, y)
        logging.info(f"End fit: {self.real_name}")
        return self

    def fit_transform(self, X, y=None):
        logging.info(f"Begin fit_transform: {self.real_name}")
        X_fit_transformed = self.transformer.fit_transform(X, y)
        logging.info(f"End fit_transform: {self.real_name}")
        return X_fit_transformed

    def transform(self, X):
        logging.info(f"Begin transform: {self.real_name}")
        X_transformed = self.transformer.transform(X)
        logging.info(f"End transform: {self.real_name}")
        return X_transformed

Then you can use this LoggingTransformer when creating your Pipeline:

pipeline_logging = Pipeline(steps=[('scaler', LoggingTransformer(StandardScaler())),
                             ('pca', LoggingTransformer(PCA(n_components=2))),
                             ('estimator', RandomForestClassifier(n_estimators=5, max_depth=3))])
pipeline_logging.fit(X_train, y_train)

When you use pipeline.fit, it will call the fit and transform methods for each step in turn and log the appropriate messages.

Use passthrough in Scikit-Learn Pipeline

In a Pipeline, a step can be set to 'passthrough', which means that for this specific step, the input data will pass through unchanged to the next step.

This is useful when you want to selectively enable/disable certain steps in a complex pipeline.

Taking the code example above, we know that when using DecisionTree or RandomForest, standardizing the data is unnecessary, so we can use passthrough to skip this step.

An example would be as follows:

param_grid = {'scaler': ['passthrough'],
              'pca__n_components': [2, 'mle'],
              'estimator__n_estimators': [3, 5, 7],
              'estimator__max_depth': [3, 5]}
clf = GridSearchCV(pipeline, param_grid=param_grid, cv=5, n_jobs=4)
clf.fit(X, y)

Reusing the Pipeline

After a journey of trials and tribulations, we finally have a well-performing machine learning model.

Now, you might consider how to reuse this model, share it with colleagues, or deploy it in a production environment.

However, the result of a model’s training includes not only the model itself but also the various data processing steps, which all need to be saved.

Using joblib and Pipeline, we can save the entire training process for later use. The following code provides a simple example:

from joblib import dump, load

# save pipeline
dump(pipeline, 'model_pipeline.joblib')

# load pipeline
loaded_pipeline = load('model_pipeline.joblib')

# predict with loaded pipeline
loaded_predictions = loaded_pipeline.predict(X_test)

Conclusion

The official Scikit-Learn documentation is among the best I’ve seen. By learning its contents, you can master the basics of machine learning applications.

However, when using Scikit-Learn in real projects, we often encounter various details that the official documentation may not cover.

How to correctly combine Transformers with Pipeline is one such case. In this article, I introduced why to use Transformers and some typical application scenarios.

Then, I interpreted the working principle of Pipeline from the source code level and completed the reasonable use case when applied to train and test datasets.

Finally, for each stage of a real machine learning project, I introduced the best practices of combining Transformers with Pipeline based on my work experience.

I hope this article can help you. If you have any questions, please leave me a message, and I will try my best to answer them.


Thank you for reading my stories.

You can subscribe to get the latest Data Science stories from me.

Find me on LinkedIn or Twitter(X) if you have any questions.

This article was originally published on Data Leads Future.

The post Ensuring Correct Use of Transformers in Scikit-learn Pipeline appeared first on Towards Data Science.

]]>
How to Optimize Multidimensional Numpy Array Operations with Numexpr https://towardsdatascience.com/how-to-optimize-multidimensional-numpy-array-operations-with-numexpr-32ba9ea8e9a6/ Sun, 22 Oct 2023 16:29:47 +0000 https://towardsdatascience.com/how-to-optimize-multidimensional-numpy-array-operations-with-numexpr-32ba9ea8e9a6/ FAST COMPUTING This is a relatively brief article. In it, I will use a real-world scenario as an example to explain how to use Numexpr expressions in multidimensional Numpy arrays to achieve substantial performance improvements. There aren’t many articles explaining how to use Numexpr in multidimensional Numpy arrays and how to use Numexpr expressions, so […]

The post How to Optimize Multidimensional Numpy Array Operations with Numexpr appeared first on Towards Data Science.

]]>
FAST COMPUTING

This is a relatively brief article. In it, I will use a real-world scenario as an example to explain how to use Numexpr expressions in multidimensional Numpy arrays to achieve substantial performance improvements.

There aren’t many articles explaining how to use Numexpr in multidimensional Numpy arrays and how to use Numexpr expressions, so I hope this one will help you.


Introduction

Recently, while reviewing some of my old work, I stumbled upon this piece of code:

def predict(X, w, b):
    z = np.dot(X, w)
    y_hat = sigmoid(z)
    y_pred = np.zeros((y_hat.shape[0], 1))

    for i in range(y_hat.shape[0]):
        if y_hat[i, 0] &lt; 0.5:
            y_pred[i, 0] = 0
        else:
            y_pred[i, 0] = 1
    return y_pred

This code transforms prediction results from probabilities to classification results of 0 or 1 in the logistic regression model of machine learning.

But heavens, who would use a for loop to iterate over Numpy ndarray?

You can foresee that when the data reaches a certain amount, it will not only occupy a lot of memory, but the performance will also be inferior.

That’s right, the person who wrote this code was me when I was younger.

With a sense of responsibility, I plan to rewrite this code with the Numexpr library today.

Along the way, I will show you how to use Numexpr and Numexpr’s where expression in multidimensional Numpy arrays to achieve significant performance improvements.


Code Implementation

If you are not familiar with the basic usage of Numexpr, you can refer to this article:

Exploring Numexpr: A Powerful Engine Behind Pandas

This article uses a real-world example to demonstrate the specific usage of Numexpr’s API and expressions in Numpy and Pandas.

where(bool, number1, number2): number – number1 if the bool condition is true, number2 otherwise.

The above is the usage of the where expression in Numpy.

When dealing with matrix data, you may used to using Pandas DataFrame. But since the eval method of Pandas does not support the where expression, you can only choose to use Numexpr in multidimensional Numpy ndarray.

Don’t worry, I’ll explain it to you right away.

Before starting, we need to import the necessary packages and implement a generate_ndarray method to generate a specific size ndarray for testing:

from typing import Callable
import time

import numpy as np
import numexpr as ne
import matplotlib.pyplot as plt

rng = np.random.default_rng(seed=4000)

def generate_ndarray(rows: int) -&gt; np.ndarray:
    result_array = rng.random((rows, 1))
    return result_array

First, we generate a matrix of 200 rows to see if it is the test data we want:

In:  arr = generate_ndarray(200)
     print(f"The dimension of this array: {arr.ndim}")
     print(f"The shape of this array: {arr.shape}")

Out: The dimension of this array: 2
     The shape of this array: (200, 1)

To be close to the actual situation of the logistic regression model, we generate an ndarray of the shape (200, 1)an. Of course, you can also test other shapes of ndarray according to your needs.

Then, we start writing the specific use of Numexpr in the numexpr_to_binary method:

  • First, we use the index to separate the columns that need to be processed.
  • Then, use the where expression of Numexpr to process the values.
  • Finally, merge the processed columns with other columns to generate the required results.

Since the ndarray’s shape here is (200, 1), there is only one column, so I add a new dimension.

The code is as follows:

def numexpr_to_binary(np_array: np.ndarray) -&gt; np.ndarray:
    temp = np_array[:, 0]
    temp = ne.evaluate("where(temp&lt;0.5, 0, 1)")
    return temp[:, np.newaxis]

We can test the result with an array of 10 rows to see if it is what I want:

arr = generate_ndarray(10)
result = numexpr_to_binary(arr)

mapping = np.column_stack((arr, result))
mapping

Look, the match is correct. Our task is completed.

The entire process can be demonstrated with the following figure:


Performance Comparison

After the code implementation, we need to compare the Numexpr implementation version with the previous for each implementation version to confirm that there has been a performance improvement.

First, we implement a numexpr_example method. This method is based on the implementation of Numexpr:

def numexpr_example(rows: int) -&gt; np.ndarray:
    orig_arr = generate_ndarray(rows)
    the_result = numexpr_to_binary(orig_arr)
    return the_result

Then, we need to supplement a for_loop_example method. This method refers to the original code I need to rewrite and is used as a performance benchmark:

def for_loop_example(rows: int) -&gt; np.ndarray:
    the_arr = generate_ndarray(rows)
    for i in range(the_arr.shape[0]):
        if the_arr[i][0] &lt; 0.5:
            the_arr[i][0] = 0
        else:
            the_arr[i][0] = 1
    return the_arr

Then, I wrote a test method time_method. This method will generate data from 10 to 10 to the 9th power rows separately, call the corresponding method, and finally save the time required for different data amounts:

def time_method(method: Callable):
    time_dict = dict()
    for i in range(9):
        begin = time.perf_counter()
        rows = 10 ** i
        method(rows)
        end = time.perf_counter()
        time_dict[i] = end - begin
    return time_dict

We test the numexpr version and the for_loop version separately, and use matplotlib to draw the time required for different amounts of data:

t_m = time_method(for_loop_example)
t_m_2 = time_method(numexpr_example)
plt.plot(t_m.keys(), t_m.values(), c="red", linestyle="solid")
plt.plot(t_m_2.keys(), t_m_2.values(), c="green", linestyle="dashed")
plt.legend(["for loop", "numexpr"])
plt.xlabel("exponent")
plt.ylabel("time")
plt.show()

It can be seen that when the number of rows of data is greater than 10 to the 6th power, the Numexpr version of the implementation has a huge performance improvement.


Conclusion

After explaining the basic usage of Numexpr in the previous article, this article uses a specific example in actual work to explain how to use Numexpr to rewrite existing code to obtain performance improvement.

This article mainly uses two features of Numexpr:

  1. Numexpr allows calculations to be performed in a vectorized manner.
  2. During the calculation of Numexpr, no new arrays will be generated, thereby significantly reducing memory usage.

Thank you for reading. If you have other solutions, please feel free to leave a message and discuss them with me.


Thank you for reading my stories.

You can subscribe to get the latest Data Science stories from me.

Find me on LinkedIn or Twitter(X) if you have any questions.

This article was originally published on Data Leads Future.

The post How to Optimize Multidimensional Numpy Array Operations with Numexpr appeared first on Towards Data Science.

]]>
Exploring Numexpr: A Powerful Engine Behind Pandas https://towardsdatascience.com/exploring-numexpr-a-powerful-engine-behind-pandas-cdb94965ca3a/ Fri, 22 Sep 2023 03:56:30 +0000 https://towardsdatascience.com/exploring-numexpr-a-powerful-engine-behind-pandas-cdb94965ca3a/ Enhancing your data analysis performance with Python's Numexpr and Pandas' eval/query functions

The post Exploring Numexpr: A Powerful Engine Behind Pandas appeared first on Towards Data Science.

]]>
FAST COMPUTING
Use Numexpr to help me find the most livable city. Photo Credit: Created by Author, Canva
Use Numexpr to help me find the most livable city. Photo Credit: Created by Author, Canva

This article will introduce you to the Python library Numexpr, a tool that boosts the computational performance of Numpy Arrays. The eval and query methods of Pandas are also based on this library.

This article also includes a hands-on weather data analysis project.

By reading this article, you will understand the principles of Numexpr and how to use this powerful tool to speed up your calculations in reality.


Introduction

Recalling Numpy Arrays

In a previous article discussing Numpy Arrays, I used a library example to explain why Numpy’s Cache Locality is so efficient:

Python Lists Vs. NumPy Arrays: A Deep Dive into Memory Layout and Performance Benefits

Each time you go to the library to search for materials, you take out a few books related to the content and place them next to your desk.

This way, you can quickly check related materials without having to run to the shelf each time you need to read a book.

This method saves a lot of time, especially when you need to consult many related books.

In this scenario, the shelf is like your memory, the desk is equivalent to the CPU’s L1 cache, and you, the reader, are the CPU’s core.

When the CPU accesses RAM, the cache loads the entire cache line into the high-speed cache. Image by Author
When the CPU accesses RAM, the cache loads the entire cache line into the high-speed cache. Image by Author

The limitations of Numpy

Suppose you are unfortunate enough to encounter a demanding professor who wants you to take out Shakespeare and Tolstoy’s works for a cross-comparison.

At this point, taking out related books in advance will not work well.

First, your desk space is limited and cannot hold all the books of these two masters at the same time, not to mention the reading notes that will be generated during the comparison process.

Second, you’re just one person, and comparing so many works would take too long. It would be nice if you could find a few more people to help.

This is the current situation when we use Numpy to deal with large amounts of data:

  • The number of elements in the Array is too large to fit into the CPU’s L1 cache.
  • Numpy’s element-level operations are single-threaded and cannot utilize the computing power of multi-core CPUs.

What should we do?

Don’t worry. When you really encounter a problem with too much data, you can call on our protagonist today, Numexpr, to help.


Understanding Numexpr: What and Why

How it works

When Numpy encounters large arrays, element-wise calculations will experience two extremes.

Let me give you an example to illustrate. Suppose there are two large Numpy ndarrays:

import numpy as np
import numexpr as ne

a = np.random.rand(100_000_000)
b = np.random.rand(100_000_000)

When calculating the result of the expression a**5 + 2 * b, there are generally two methods:

One way is Numpy’s vectorized calculation method, which uses two temporary arrays to store the results of a**5 and 2*b separately.

In: %timeit a**5 + 2 * b

Out:2.11 s ± 31.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

At this time, you have four arrays in your memory: a, b, a**5, and 2 * b. This method will cause a lot of memory waste.

Moreover, since each Array’s size exceeds the CPU cache’s capacity, it cannot use it well.

Another way is to traverse each element in two arrays and calculate them separately.

c = np.empty(100_000_000, dtype=np.uint32)

def calcu_elements(a, b, c):
    for i in range(0, len(a), 1):
        c[i] = a[i] ** 5 + 2 * b[i]

%timeit calcu_elements(a, b, c)

Out: 24.6 s ± 48.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

This method performs even worse. The calculation will be very slow because it cannot use vectorized calculations and only partially utilize the CPU cache.

Numexpr’s calculation

Numexpr commonly uses only one evaluate method. This method will receive an expression string each time and then compile it into bytecode using Python’s compile method.

Numexpr also has a virtual machine program. The virtual machine contains multiple vector registers, each using a chunk size of 4096.

When Numexpr starts to calculate, it sends the data in one or more registers to the CPU’s L1 cache each time. This way, there won’t be a situation where the memory is too slow, and the CPU waits for data.

At the same time, Numexpr’s virtual machine is written in C, removing Python’s GIL. It can utilize the computing power of multi-core CPUs.

So, Numexpr is faster when calculating large arrays than using Numpy alone. We can make a comparison:

In:  %timeit ne.evaluate('a**5 + 2 * b')
Out: 258 ms ± 14.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Summary of Numexpr’s working principle

Let’s summarize the working principle of Numexpr and see why Numexpr is so fast:

Executing bytecode through a virtual machine. Numexpr uses bytecode to execute expressions, which can fully utilize the branch prediction ability of the CPU, which is faster than using Python expressions.

Vectorized calculation. Numexpr will use SIMD (Single Instruction, Multiple Data) technology to improve computing efficiency significantly for the same operation on the data in each register.

Multi-core parallel computing. Numexpr’s virtual machine can decompose each task into multiple subtasks. They are executed in parallel on multiple CPU cores.

Less memory usage. Unlike Numpy, which needs to generate intermediate arrays, Numexpr only loads a small amount of data when necessary, significantly reducing memory usage.

Workflow diagram of Numexpr. Image by Author
Workflow diagram of Numexpr. Image by Author

Numexpr and Pandas: A Powerful Combination

You might be wondering: We usually do data analysis with pandas. I understand the performance improvements Numexpr offers for Numpy, but does it have the same improvement for Pandas?

The answer is Yes.

The eval and query methods in pandas are implemented based on Numexpr. Let’s look at some examples:

Pandas.eval for Cross-DataFrame operations

When you have multiple pandas DataFrames, you can use pandas.eval to perform operations between DataFrame objects, for example:

import pandas as pd

nrows, ncols = 1_000_000, 100
df1, df2, df3, df4 = (pd.DataFrame(rng.random((nrows, ncols))) for i in range(4))

If you calculate the sum of these DataFrames using the traditional pandas method, the time consumed is:

In:  %timeit df1+df2+df3+df4
Out: 1.18 s ± 65.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

You can also use pandas.eval for calculation. The time consumed is:

In:  %timeit pd.eval('df1 + df2 + df3 + df4')
Out: 452 ms ± 29.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

The calculation of the eval version can improve performance by 50%, and the results are precisely the same:

In:  np.allclose(df1+df2+df3+df4, pd.eval('df1+df2+df3+df4'))
Out: True

DataFrame.eval for column-level operations

Just like pandas.eval, DataFrame also has its own eval method. We can use this method for column-level operations within DataFrame, for example:

df = pd.DataFrame(rng.random((1000, 3)), columns=['A', 'B', 'C'])

result1 = (df['A'] + df['B']) / (df['C'] - 1)
result2 = df.eval('(A + B) / (C - 1)')

The results of using the traditional pandas method and the eval method are precisely the same:

In:  np.allclose(result1, result2)
Out: True

Of course, you can also directly use the eval expression to add new columns to the DataFrame, which is very convenient:

df.eval('D = (A + B) / C', inplace=True)
df.head()
Directly use the eval expression to add new columns. Image by Author
Directly use the eval expression to add new columns. Image by Author

Using DataFrame.query to quickly find data

If the eval method of DataFrame executes comparison expressions, the returned result is a boolean result that meets the conditions. You need to use Mask Indexing to get the desired data:

mask = df.eval('(A < 0.5) &amp; (B < 0.5)')
result1 = df[mask]
result1
When filtering data only with DataFrame.query, it is necessary to use a boolean mask. Image by Author
When filtering data only with DataFrame.query, it is necessary to use a boolean mask. Image by Author

The DataFrame.query method encapsulates this process, and you can directly obtain the desired data with the query method:

In:   result2 = df.query('A < 0.5 and B < 0.5')
      np.allclose(result1, result2)
Out:  True

When you need to use scalars in expressions, you can use the @ symbol to indicate:

In:  Cmean = df['C'].mean()
     result1 = df[(df.A < Cmean) &amp; (df.B < Cmean)]
     result2 = df.query('A < @Cmean and B < @Cmean')
     np.allclose(result1, result2)
Out: True

Practical Example: Using Numexpr and Pandas in Real-World Scenarios

In all articles explaining Numexpr, examples are made using synthetic data. This feeling is not good and may cause you to not know how to use this powerful library to complete tasks after reading the article.

Therefore, in this article, I will take a weather data analysis project as an example to explain how we should use Numexpr to process large datasets in actual work.

Project Goal

After a hot summer, I really want to see if there is such a place where the climate is pleasant in summer and suitable for me to escape the heat.

This place should meet the following conditions:

  1. In the summer:
  2. The daily average temperature is between 18 degrees Celsius and 22 degrees Celsius;
  3. The diurnal temperature difference is between 4 degrees Celsius and 6 degrees Celsius;
  4. The average wind speed in kmh is between 6 and 10. It would feel nice to have a breeze blowing on me.

Data preparation

This time, I used the global major city weather data provided by the Meteostat JSON API.

The data is licensed under the Creative Commons Attribution-NonCommercial 4.0 International Public License (CC BY-NC 4.0) and can be used commercially.

I used the parquet dataset integrated on Kaggle based on the Meteostat JSON API for convenience.

I used version 2.0 of pandas. The pandas.read_parquet method of this version can easily read parquet data. But before reading, you need to install Pyarrow and Fastparquet.

conda install pyarrow
conda install fastparquet

Data analysis

After the preliminary preparations, we officially entered the data analysis process.

First, I read the data into memory and then look at the situation of this dataset:

import os
from pathlib import Path

import pandas as pd

root = Path(os.path.abspath("")).parents[0]
data = root/"data"

df = pd.read_parquet(data/"daily_weather.parquet")
df.info()
Overview of the dataset's metadata. Image by Author
Overview of the dataset’s metadata. Image by Author

As shown in the figure, this dataset contains 13 fields. According to the goal of this project, I plan to use the fields of city_name, season, min_temp_c, max_temp_c, avg_wind_speed_kmh.

Next, I first remove the data in the corresponding fields that contain empty values for subsequent calculations, and then select the desired fields to form a new DataFrame:

sea_level_not_null = df.dropna(subset=['min_temp_c', 'max_temp_c', 'avg_wind_speed_kmh'] , how='any')

sample = sea_level_not_null[['city_name', 'season',
                             'min_temp_c', 'max_temp_c', 'avg_wind_speed_kmh']]

Since I need to calculate the average temperature and temperature difference, I use the Pandas.eval method to directly calculate the new indicators on the DataFrame:

sample.eval('avg_temp_c = (max_temp_c + min_temp_c) / 2', inplace=True)
sample.eval('diff_in_temp = max_temp_c - min_temp_c', inplace=True)

Then, average a few indicators by city_name and season:

sample = sample.groupby(['city_name', 'season'])
        [['min_temp_c', 'max_temp_c', 'avg_temp_c', 'diff_in_temp', 'avg_wind_speed_kmh']]
            .mean().round(1).reset_index()

sample
Results after data cleaning and metric calculation. Image by Author
Results after data cleaning and metric calculation. Image by Author

Finally, according to the goal of the project, I use DataFrame.query to filter the dataset:

sample.query('season=="Summer" 
        &amp; 18 < avg_temp_c < 22 
        &amp; 4 < diff_in_temp < 6 
        &amp; 6 < avg_wind_speed_kmh < 10')
Finally, we obtained the only result that met the criteria. Image by Author
Finally, we obtained the only result that met the criteria. Image by Author

The final result is out. Only one city meets my requirements: Vladivostok, a non-freezing port in the east of Russia. It is indeed an excellent place to escape the heat!


Best Practices and Takeaways

After explaining the project practice of Numexpr, as usual, I will explain some of the best practices of Numexpr combined with my own work experience for you.

Avoid overuse

Although Numexpr and pandas eval have significant performance advantages when handling large data sets. However, dealing with small data sets is not faster than regular operations.

Therefore, you should choose whether to use Numexpr based on the size and complexity of the data. And my experience is to use it when you feel the need, as small datasets won’t slow things down too much anyway.

The use of the eval function is limited

The eval function does not support all Python and pandas operations.

Therefore, before using it, you should consult the documentation to understand what operations eval supports.

Be careful when handling strings

Although I used season="Summer" to filter the dataset in the project practice, the eval function is not very fast when dealing with strings.

If you have a lot of string operations in your project, you need to consider other ways.

Be mindful of memory usage

Although Numexpr no longer generates intermediate arrays, large datasets will occupy a lot of memory.

For example, the dataset occupies 2.6G of memory in my project example. At this time, you have to be very careful to avoid the program crashing due to insufficient memory.

Use the appropriate data type

This point is detailed in the official documentation, so I won’t repeat it here.

Use the inplace parameter when needed

Using the inplace parameter of the DataFrame.eval method can directly modify the original dataset, avoiding generating a new dataset and occupying a lot of memory.

Of course, doing so will lead to modifications to the original dataset, so please be careful.


Conclusion

In this article, I brought a comprehensive tutorial on Numexpr, including:

The applicable scenarios of Numexpr, the effect of performance improvement, and its working principle.

The eval and query methods in Pandas are also based on Numexpr. It will bring great convenience and performance improvement to your pandas’ operations if used appropriately.

Through a global weather data analysis project, I demonstrated how to use pandas’ eval and query methods in practice.

As always, combined with my work experience, I introduced the best practices of Numexpar and the eval method of pandas.

Thank you for reading. If you have any questions, please leave a message in the comment area, and I will answer in time.


Let me start with the basics and walk you through the best scientific computing practices at work.

Fast Computing


Thank you for reading my stories.

You can subscribe to get the latest Data Science stories from me.

Find me on LinkedIn or Twitter(X) if you have any questions.

This article was originally published on Data Leads Future.

The post Exploring Numexpr: A Powerful Engine Behind Pandas appeared first on Towards Data Science.

]]>