ollama is a software project which makes it easy to run LLMs on your local machine. Running ollama run llama3 downloaded a 4-bit quantized model that could run on my Macbook M2, and then dropped me into a CLI where I could enter prompts and responses stream into my terminal.

In the above clip, I'm running Meta's latest open source model Llama 3. These models are less powerful than OpenAI's models - which are mainly GPT-4o, GPT-4, and GPT-3.5 Turbo as of writing, but they pack a serious punch. The fact that a model that can run on a laptop can get within throwing distance of GPT-3.5, which powered all of ChatGPT not long ago is pretty insane.

There's a number of reasons you would want to run your own local AI instead of something like ChatGPT:

  1. Privacy. With ChatGPT, OpenAI has access to every query and response you send. When someone else controls your data, despite their best intentions, sometimes it will leak. In OpenAI's case, it already has already happened at least once. Depending on the sensitivity of your prompts, this may be more or less of a concern.
  2. Flexibility. Using your own models allows you to use special purpose models better suited for individual tasks. You can also fine-tune open source models on your own hardware, which might be incredibly useful if you have a lot of organizational data you would like the model to recognize.

The project's README shows some of its capabilities:


REST API

Ollama has a REST API for running and managing models.

Generate a response

curl http://localhost:11434/api/generate -d '{
  "model": "llama3",
  "prompt":"Why is the sky blue?"
}'

Chat with a model

curl http://localhost:11434/api/chat -d '{
  "model": "llama3",
  "messages": [
    { "role": "user", "content": "why is the sky blue?" }
  ]
}'

See the API documentation for all endpoints.


We can try one of those curl commands and see what the responses look like:

{"model":"llama3","created_at":"2024-06-20T00:25:35.629748Z","response":"The","done":false}
{"model":"llama3","created_at":"2024-06-20T00:25:35.662856Z","response":" sky","done":false}
{"model":"llama3","created_at":"2024-06-20T00:25:35.695868Z","response":" appears","done":false}
{"model":"llama3","created_at":"2024-06-20T00:25:35.729704Z","response":" blue","done":false}
{"model":"llama3","created_at":"2024-06-20T00:25:35.763045Z","response":" because","done":false}
...

These stream in over time instead of being dumped out when the message is complete. Here's the code from ollama that's generating the stream:

func streamResponse(c *gin.Context, ch chan any) {
	c.Header("Content-Type", "application/x-ndjson")
	c.Stream(func(w io.Writer) bool {
		val, ok := <-ch
		if !ok {
			return false
		}

		bts, err := json.Marshal(val)
		if err != nil {
			slog.Info(fmt.Sprintf("streamResponse: json.Marshal failed with %s", err))
			return false
		}

		// Delineate chunks with new-line delimiter
		bts = append(bts, '\n')
		if _, err := w.Write(bts); err != nil {
			slog.Info(fmt.Sprintf("streamResponse: w.Write failed with %s", err))
			return false
		}

		return true
	})
}

application/x-ndjson means "Newline delimited JSON". This detail makes it easier to delineate between messages. Since you know they are broken on newlines, you know you can use line-reading functionality that's present in many languages to process them.

A Simple ChatGPT Clone

For fun, we can build a simple ChatGPT-style web application on top of this API with a few components:

  • an HTML page that presents a form to collect a prompt with a button for submission, and javascript to send the prompt, listens for a response, and writes the responses to the page as they stream in
  • a server with two endpoints:
    • one rendering the page above,
    • one handling receiving a prompt, marshalling it to ollama, and streaming the response to the client.

With that in mind, let's get to work!

Go Implementation

We can do everything in Go using just the standard library.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"html/template"
	"io"
	"log"
	"net/http"
)

func main() {
	tmpl := template.Must(template.ParseFiles("index.html"))

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if err := tmpl.Execute(w, nil); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
	})

	http.HandleFunc("/api/generate", func(w http.ResponseWriter, r *http.Request) {
		var request struct {
			Model  string `json:"model"`
			Prompt string `json:"prompt"`
		}
		if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}

		payload := map[string]string{
			"model":  request.Model,
			"prompt": request.Prompt,
		}

		payloadBytes, err := json.Marshal(payload)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		resp, err := http.Post("http://localhost:11434/api/generate", "application/json", bytes.NewBuffer(payloadBytes))
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		defer resp.Body.Close()

		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)

		decoder := json.NewDecoder(resp.Body)
		for {
			var response map[string]interface{}
			if err := decoder.Decode(&response); err == io.EOF {
				break
			} else if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			if err := json.NewEncoder(w).Encode(response); err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.(http.Flusher).Flush()
		}
	})

	fmt.Println("Server is running on http://localhost:8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

Handling the chunked JSON is done in that last block with the decoder. The decoder is a state machine that handles its own buffering. The for loop checks for error conditions, and if they aren't found, will flush the buffer contents to the client, which in this case is a javascript program running inside the browser. As the Javascript program receives those messages, it writes them on the page.

Decoding and immediately re-encoding the result looks a little silly, we could have just pointed the client directly to ollama and left all of this out of the server. But in a more realistic deployment, we would not want clients interacting directly with the model - it would be important for the server to sit in between and manage the process.

How the webapp looks while running

That was all it took to build the Go implementation! I like how skimming the docs for standard library modules that sounded like what you're looking for is all it really takes to get up and running in the language. The fact that I can do that when I'm not very experienced in the language is a testament to the skill of its design.

The rest of this post will cover building this same functionality in Clojure instead of Go.

Clojure - HTTP kit Implementation

Unlike Go, Clojure does not have a production grade HTTP server and client available in the standard library.

I used http-kit for this task because it was the smallest library I knew of which could single handedly meet my requirements - it has an synchronous/asynchronous http clients as well as synchronous/asynchronous Ring-compliant web servers. I also chose it because of its reputation of being small and focused with minimal dependencies.

In Clojure, Ring-compliant HTTP server's use handlers that are maps containing keys for various http constructs such as :status, :headers, and :body. I used this as a starting point and built a run of the mill synchronous handler that serves the same page as the Go app above.

(defn read-html-template []
  (println "fetching html template")
  (slurp (io/resource "index.html")))

(defn index-handler [req]
  (println "in index handler")
  {:status  200
   :headers {"Content-Type" "text/html"}
   :body    (read-html-template)})
   
(defn not-found-handler [req]
  {:status  404
   :headers {"Content-Type" "text/plain"}
   :body    "Page not found."})
   
(defn app [req]
  (let [uri (:uri req)
        method (:request-method req)]
    (cond
      (and (= uri "/") (= method :get)) (index-handler req)
      :else (not-found-handler req)))) 

That all works because you can pass any arbitrary string as your response body and set its content type to text/html, it will render correctly.

At this point, the client has a javascript programming that wants to pass a prompt and listen for the streaming response. We will want an asynchronous handler to do this, so we'll use http-kit's as-channel function. It's not covered in the project's main documentation, but the docstring in the source itself has enough to get started with. Let's use it and wire it up.

(def clients_ (atom #{}))
(defn my-async-handler [ring-req]
  (http/as-channel ring-req
              {:on-open (fn [ch]
                          (println "conn open!")
                          (println ring-req)
                          (swap! clients_ conj ch))
               :on-close (fn [ch]
                           (println "conn close!")
                           (swap! clients_ disj ch))}))

(defn app [req]
  (let [uri (:uri req)
        method (:request-method req)]
    (cond
      (and (= uri "/") (= method :get)) (index-handler req)
      (and (= uri "/api/generate") (= method :post)) (my-async-handler req)
      :else (not-found-handler req))))

When I evaluate these forms, switch back to my browser, and hit the "Submit" buttton, I see the print statements fire. When I evaliate clients_, I see it now has a new client.

To get the actual messages, we can start writing a function to send a POST request to the ollama server. After that, we want to find a way to access messages as they stream in and send them to the client.

In typical asynchronous programming, this kind of message passage is done with callbacks. http-kit has its own concept of channels that is separate from core.async with its own semantics. When we get a message, we will want to send it to the client like this:

(http/send! ch {:status 200
                :headers {"Content-Type" "application/json"}
                :body json-encoded}
            false)

To kick off the HTTP request to ollama with the prompt, we want to use http-kit's client functionality. There is documentation on making asynchronous requests with callbacks:

;fire and forget, returns immediately[1], returned promise is ignored
(http/get "http://host.com/path")

(def options {:timeout 200             ; ms
              :basic-auth ["user" "pass"]
              :query-params {:param "value" :param2 ["value1" "value2"]}
              :user-agent "User-Agent-string"
              :headers {"X-Header" "Value"}})
(http/get "http://host.com/path" options
          (fn [{:keys [status headers body error]}] ;; asynchronous response handling
            (if error
              (println "Failed, exception is " error)
              (println "Async HTTP GET: " status))))
 ; [1] may not always true, since DNS lookup maybe slow

This will kind of work, but it will fire the promise once all the messages are received. We're looking for some way to access the messages as the stream in.

I looked at the source for help and found the following. Without guidance from the documentation, the :stream option sounded like a good option to try.

  Returned body type is controlled by `:as` option:

   Without automatic unzipping:
     `:none`           - org.httpkit.DynamicBytes
     `:raw-byte-array` - bytes[]

   With automatic unzipping:
     `:byte-array`     - bytes[]
     `:stream`         - ByteInputStream
     `:text`           - String (charset based on Content-Type header)
     `:auto`           - As `:text` or `:stream` (based on Content-Type header)

There's no further mention of how to use a ByteInputStream in the docs, so we can check the source for that.

package org.httpkit;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

/**
 * No synchronization, better toString
 */
public class BytesInputStream extends InputStream {
    private final byte[] buf;
    private final int count;
    private int mark = 0;

    private int pos;

    public BytesInputStream(byte[] data, int length) {
        this.buf = data;
        this.count = length;
        this.pos = 0;
    }

    /**
     * get the underlying bytes, copied
     *
     * @return
     */
    public byte[] bytes() {
        return Arrays.copyOf(buf, count);
    }

    public int read() throws IOException {
        return (pos < count) ? (buf[pos++] & 0xff) : -1;
    }
...

Since this class subclasses InputStream, it can be read by an InputStreamReader.

The docs for InputStreamReader recommend wrapping with a BufferedReader, so let's do that. We want to take inputs line by line

(defn send-prompt-to-ollama [ch prompt]
  (let [url "http://localhost:11434/api/generate"]
    (client/post url
                 {:as :stream
                  :headers {"Content-Type" "application/json"}
                  :body (json/write-str {:model "llama3" :prompt prompt})}
                 (fn [{:keys [status headers body error]}]
                   (if error
                     (do
                       (http/send! ch {:status 500 :body (str "Internal Server Error: " error)})
                       (http/close ch))
                     (let [stream ^java.io.InputStream body
                           reader (java.io.BufferedReader. (java.io.InputStreamReader. stream "UTF-8"))]
                       (loop []
                         (let [line (.readLine reader)]
                           (if (nil? line)
                             (do
                               (.close reader)
                               (http/close ch))
                             (do
                               (let [response (:response (json/read-json line))
                                     json-encoded (json/write-str {:response response})]
                                 (http/send! ch {:status 200
                                                 :headers {"Content-Type" "application/json"}
                                                 :body json-encoded}
                                             false))
                               (recur)))))))))))

Unfortunately, after writing this up, I observed the same behavior as earlier: responses were not streamed in, but rather returned all at once.

It turns out this appears to be a known limitation of http-kit's client functionality. The comment thread mentioned that clj-http works for this use case, but part of the reason I went with http-kit in the first place was to minimize dependencies.

Another option to try without introducing a new dependency is to interop with java.net.http.HttpClient, which has been included with the JDK since Java 11.

Here's how I initially got that working:

(import (java.net.http HttpClient HttpRequest HttpResponse HttpResponse$BodyHandlers HttpRequest$BodyPublishers)
        (java.net URI)
        (java.nio.charset StandardCharsets)
        (java.io InputStreamReader BufferedReader)
        (java.util.concurrent CompletableFuture))
        
(defn handle-response [ch response]
  (with-open [reader (BufferedReader. (InputStreamReader. (.body response) StandardCharsets/UTF_8))]
    (loop []
      (let [line (.readLine reader)]
        (if (nil? line)
          (do
            (.close reader)
            (http/close ch))
          (do
            (println "got line" line)
            (let [response (:response (json/read-json line))
                  json-encoded (json/write-str {:response response})]
              (println "sending response" response)
              (http/send! ch {:status 200
                              :headers {"Content-Type" "application/json"}
                              :body json-encoded}
                          false))
            (recur)))))))

(defn send-async-request [ch model prompt]
  (let [client (HttpClient/newHttpClient)
        body (str "{\"model\":\"" model "\", \"prompt\":\"" prompt "\"}")
        request (-> (HttpRequest/newBuilder)
                    (.uri (URI/create "http://localhost:11434/api/generate"))
                    (.header "Content-Type" "application/json")
                    (.POST (HttpRequest$BodyPublishers/ofString body))
                     (.build))]
    (-> (.sendAsync client request (HttpResponse$BodyHandlers/ofInputStream))
        (.thenAccept (reify java.util.function.Consumer
                       (accept [_ response]
                         (handle-response ch response))))
        (.exceptionally (reify java.util.function.Function
                          (apply [_ error]
                            (println (str "Request failed: " error))
                            nil))))))

This gave me the behavior I was looking for: responses were sent to the client as soon as they come in.

The code is a little noisy because the library uses Java idioms that were introduced after Clojure was created.

Luckily, Clojure's maintainers recently introduced language changes that make using these features less painful. From the release notes of Clojure 1.12.0-alpha12:

Functional interfaces

Java programs define "functions" with Java functional interfaces (marked with the @FunctionalInterface annotation), which have a single method.

Clojure developers can now invoke Java methods taking functional interfaces by passing functions with matching arity. The Clojure compiler implicitly converts Clojure functions to the required functional interface by constructing a lambda adapter. You can explicitly coerce a Clojure function to a functional interface by hinting the binding name in a let binding, e.g. to avoid repeated adapter construction in a loop.

See: CLJ-2799

To take advantage of these changes, we can use a preview build of the language by updating the language version in deps.edn:

org.clojure/clojure       {:mvn/version "1.12.0-beta1"}

After doing so, we can change the above functions to this:

(defn handle-response [ch response]
  (with-open [reader (BufferedReader. (InputStreamReader. (.body response) StandardCharsets/UTF_8))]
    (loop []
      (let [line (.readLine reader)]
        (if (nil? line)
          (do
            (.close reader)
            (http/close ch))
          (let [response (:response (json/read-json line))
                json-encoded (json/write-str {:response response})]
            (http/send! ch {:status 200
                            :headers {"Content-Type" "application/json"}
                            :body json-encoded}
                        false))
          (recur))))))

(defn send-async-request [ch model prompt]
  (let [client (HttpClient/newHttpClient)
        body (str "{\"model\":\"" model "\", \"prompt\":\"" prompt "\"}")
        request (-> (HttpRequest/newBuilder)
                    (.uri (URI/create "http://localhost:11434/api/generate"))
                    (.header "Content-Type" "application/json")
                    (.POST (HttpRequest$BodyPublishers/ofString body))
                    (.build))]
    (-> (.sendAsync client request (HttpResponse$BodyHandlers/ofInputStream))
        (.thenAccept (fn [response] (handle-response ch response)))
        (.exceptionally (fn [error] (println (str "Request failed: " error)))))))

From here, I just needed to wire up the request logic to my async handler and then update my main handler to route to it.

(defn my-async-handler [ring-req]
  (let [body (slurp (:body ring-req))
        prompt (try
                 (:prompt (json/read-json body))
                 (catch Exception e
                   (println "Error parsing request body:" e)
                   nil))]
    (if prompt
      (http/as-channel ring-req
                       {:on-open (fn [ch]
                                   (println "conn open!")
                                   (swap! clients_ conj ch)
                                   (send-async-request ch "llama3" prompt))
                        :on-close (fn [ch]
                                    (println "conn close!")
                                    (swap! clients_ disj ch))})
      {:status 400
       :headers {"Content-Type" "application/json"}
       :body (json/write-str {:error "Invalid request"})})))

(defn app [req]
  (let [uri (:uri req)
        method (:request-method req)]
    (cond
      (and (= uri "/") (= method :get)) (index-handler req)
      (and (= uri "/api/generate") (= method :post)) (#'my-async-handler req)
      :else (not-found-handler req))))

Clojure - Pedestal Implementation

I was hoping this project would be a good place to use core.async, which I just started recently using, but realized while working through the previous implementation that http-kit has its own semantics for channels. It didn't really make sense to use core.async there.

I heard that Pedestal was built with core.async in mind, so I ended up throwing together another implementation with it.

In this version, my request function sends a request with the user's prompt and returns a channel that has the messages queued.

(defn handle-response [response-ch ^HttpResponse response]
  (let [reader (BufferedReader. (InputStreamReader. (.body response) StandardCharsets/UTF_8))]
    (go-loop []
      (if-let [line (.readLine reader)]
        (do
          (println line)
          (a/>! response-ch line)
          (recur))
        (a/close! response-ch)))))

(defn send-async-request [model prompt]
  (let [client (HttpClient/newHttpClient)
        body (generate-string {:model model :prompt prompt})
        request (-> (HttpRequest/newBuilder)
                    (.uri (URI/create "http://localhost:11434/api/generate"))
                    (.header "Content-Type" "application/json")
                    (.POST (HttpRequest$BodyPublishers/ofString body))
                    (.build))
        response-ch (a/chan)]
    (-> (.sendAsync client request (HttpResponse$BodyHandlers/ofInputStream))
        (.thenAccept (fn [response] (handle-response response-ch response)))
        (.exceptionally (fn [error] (println (str "Request failed: " error)))))
    response-ch))

I used Pedestal's Server Sent Events functionality to make it work. The json interceptor is necessary to get access to the json params sent from the client to the asynchronous endpoint.

start-event-stream returns an interceptor and a function to call which takes a channel as an argument - you'll put messages on it to send them to the client. The channel's buffer is maintained by the library. In my callback function, I pop messages off the request channel and put them onto the request channel.

(defn stream-ready [event-ch ctx]
  (let [{:keys [model prompt] :as raw} (-> ctx :request :json-params)
        response-chan (send-async-request model prompt)]
    (go-loop []
      (if-let [msg (<! response-chan)]
        (do
          (a/put! event-ch (generate-string {:response msg}))
          (recur))
        (a/close! event-ch)))))

(def my-json-interceptor
  {:name  ::my-json-interceptor
   :enter (fn [{:keys [request] :as ctx}]
            (if (#{:post :put} (:request-method request))
              (let [raw-body-str (slurp (:body request))
                    json-params (parse-string raw-body-str true)]
                (assoc-in (assoc-in ctx [:request :json-params] json-params)
                          [:request :raw-body-str] raw-body-str))
              ctx))})

(def routes
  #{["/" :get
     [index-handler]
     :route-name :index]
    ["/api/generate" :post
     [my-json-interceptor (sse/start-event-stream stream-ready)]
     :route-name :stream]})

I think it makes a lot of sense to use Pedestal for async apps considering both core.async and Pedestal are maintained by the same folks (and I believe they use both libraries extensively in-house). That way you can passively benefit from any improvements or updates to core.async over time - plus it seems like there's a lot of cool stuff you can do with core.async in general. It can be difficult to use, so you are pretty much required to be willing to read the source and ask for help on Clojurians slack when needed.

For a project as small as this, Go was way easier to work with. I've written Clojure on and off over the last few years and I still spent 10-20x longer getting those implementations working. With Go, it's really easy to just pick the first dumb implementation that pops into your head and it will work. And of course, trivial deployment is always a relief.

I do still reach for Clojure from time to time, both because I enjoy the development process of building projects in small pieces without ceremony - i.e., you don't have to create mini-projects or something similar to experiment with new ideas, you just immediately try them out from whatever file you're currently working in. I also find it much easier to maintain a bird's eye view of your codebase I'm working on. Clojure code tends to be much higher level and I find it easier to reason about when I'm focused, provided I've kept everything logically organized and somewhat tidy.