As WebRTC developers, we’ve gotten very good at moving real-time media around the globe. But often, the most exciting and valuable work happens when we stop just routing media and add some processing to it. The challenge is that building custom, real-time media processing workflows is often complex, requiring deep expertise and significant engineering effort.

That’s why we were thrilled to see the announcement of Juturna, a new open-source library from the brilliant minds at Meetecho, the creators of the Janus WebRTC Server.

Juturna is a lightweight, real-time data processing library written in Python, designed to tackle this exact problem. The choice of Python is one of its most strategic features, as it unlocks a massive and mature ecosystem of powerful libraries for AI, machine learning, and computer vision.

The Juturna Python library allows developers to easily build systems that can provide live transcription (leveraging tools like OpenAI’s Whisper), use AI for content moderation, or create “smart recordings” that automatically generate summaries. This immediate access to cutting-edge tools is a game changer, giving architects incredible flexibility to run ML models locally or interface with cloud-based AI services.

So what makes Juturna special, and how does it actually work? At its core, Juturna takes a fundamentally different approach to media processing. Instead of writing monolithic applications or wrestling with complex frameworks, you build pipelines by connecting simple, reusable components. Let’s break down how this assembly-line model transforms the way we think about real-time media processing.

Deconstructing Juturna: Your Media Assembly Line

The best way to think about Juturna is like a factory assembly line for your media. You define a series of stations (called nodes) that your media will pass through, with each station performing a specific task.

There are three fundamental types of nodes:

  • Sources: These are the inputs to your pipeline. This is where the media comes from, like a live RTP stream from your Janus server (or any other source of RTP streams), a FFmpeg feed, or even a pre-recorded file.
  • Processing Nodes: These are the “brains” of the operation where the real magic happens. A processing node takes in media, performs some transformation, and passes the result downstream. This could be anything from detecting motion in a video to transcribing audio.
  • Sinks: These are the outputs of your pipeline. This is where the processed data or media ends up, whether it’s being sent to an HTTP webhook, saved to a file, or streamed out to another RTP endpoint.

Best of all, the entire pipeline and all its nodes are defined using simple and readable JSON, making it incredibly accessible for developers to get started and prototype ideas quickly.

But Juturna isn’t just easy to use; it’s also designed to be highly extensible. You aren’t limited to the nodes that come out of the box. Developers can easily create their own custom source, processing, or sink nodes in Python to add any functionality they can imagine. 

This plugin-oriented architecture means the platform can grow and adapt to new technologies and use cases.

Putting It All Together: A Simple Motion Detector in Action

To see how these pieces fit together, let’s sketch out what a simple motion detection pipeline would look like using Juturna. Our goal is to analyze a live video stream and send an alert to a backend whenever motion is detected. The whole project is available at the juturna-motion-detector repo on Github.

First, we’d define our building blocks in a simple_pipeline.json file. This tells Juturna about the different “stations” on our assembly line and where they are located (set as “plugins“: [“./plugins“]).

/* simple_pipeline.json */
{
  "version": "0.1.0",
  "plugins": ["./plugins"],
  "pipeline": {
    "name": "motion_detection_pipeline",
    "id": "motion-001",
    "folder": "./running_pipelines",
    "nodes": [
      {
        "name": "video_source",
        "type": "source",
        "mark": "video_rtp_custom",
        "configuration": {
          "rec_host": "172.21.0.10",
          "rec_port": 8004,
          "payload_type": 96,
          "codec": "VP8",
          "width": 640,
          "height": 480
        }
      },
      {
        "name": "motion_detector",
        "type": "proc",
        "mark": "motion_detection",
        "configuration": {
          "threshold": 0.3,
          "min_area": 500,
          "sensitivity": 25
        }
      },
      {
        "name": "http_notifier",
        "type": "sink",
        "mark": "http_notifier_custom",
        "configuration": {
          "endpoint": "http://172.21.0.40/motion-detected",
          "timeout": 10,
          "content_type": "application/json"
        }
      }
    ],
    "links": [
      {
        "from": "video_source",
        "to": "motion_detector"
      },
      {
        "from": "motion_detector",
        "to": "http_notifier"
      }
    ]
  }
}

Next, we create our nodes or leverage one of the many available out of the box. For example, our motion_detection node is defined as follows:

# plugins/nodes/proc/_motion_detection/motion_detection.py
import cv2
import numpy as np
from juturna.components import BaseNode, Message
from juturna.payloads import ImagePayload

class MotionDetection(BaseNode[ImagePayload, ImagePayload]):
    def __init__(self, threshold=0.3, min_area=500, sensitivity=25, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.min_area = min_area
        self.sensitivity = sensitivity
        self.background_subtractor = cv2.createBackgroundSubtractorMOG2(
            detectShadows=True
        )
        self.frame_count = 0
        self.motion_count = 0
        self.logger.info(f'Motion detection node initialized: threshold={threshold},
 min_area={min_area}')
        
    def update(self, message: Message[ImagePayload]):
        self.frame_count += 1
        frame = message.payload.image
        
        ...
            
        # Convert to grayscale for motion detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Apply background subtraction
        fg_mask = self.background_subtractor.apply(gray)
        
        # Find contours
        contours, _ = cv2.findContours(
            fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
        )
        
        motion_detected = False
        motion_areas = []
        
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > self.min_area:
                motion_detected = True
                x, y, w, h = cv2.boundingRect(contour)
                motion_areas.append({
                    'x': int(x), 'y': int(y), 
                    'width': int(w), 'height': int(h),
                    'area': int(area)
                })
        
        ...
        
        # Only send notification when motion is detected
        if motion_detected:
            self.motion_count += 1
            self.logger.info(f"Motion detected in frame {self.frame_count}: 
{len(motion_areas)} areas")
            
            new_payload = ImagePayload()
            new_payload.image = frame
            new_payload.width = frame.shape[1]
            new_payload.height = frame.shape[0]
            new_payload.depth = frame.shape[2]
            new_payload.pixel_format = 'BGR'
            new_payload.motion_detected = motion_detected
            new_payload.motion_areas = motion_areas
            
            to_send = Message(payload=new_payload)
            self.transmit(to_send)
        else:
            # Log occasionally when no motion
            if self.frame_count % 100 == 0:
                self.logger.debug(f"No motion in frame {self.frame_count}")

And finally, we load our pipeline and run it from our script:

# main.py

#!/usr/bin/env python3
import time
import sys
import logging
import juturna as jt

def main():
    ...    
    try:
        pipeline = jt.components.Pipeline.from_json('simple_pipeline.json')
        print(f"Starting pipeline: {pipeline.name}")
        
        pipeline.warmup()
        pipeline.start()
        print("Motion detection pipeline started")
        
        while True:
            time.sleep(30)
            
    except Exception as e:
        print(f"Pipeline error: {e}")
        sys.exit(1)
    except KeyboardInterrupt:
        print("Stopping pipeline...")
        pipeline.stop()
        pipeline.destroy()

if __name__ == "__main__":
    main()

And that’s it!

When Juturna runs with this configuration, it listens for an RTP video stream. As soon as the Motion Detector node detects movement, it triggers the Alert Webhook, sending notifications to our backend at /motion-detected endpoint. 

This simple example powerfully demonstrates Juturna’s core value: it handles all the complex plumbing, letting developers focus on their unique processing logic.

Where Juturna Fits in the WebRTC Stack

Juturna is a powerful tool that you can connect to your media server, like Janus, to provide the off-ramp for advanced processing and analysis.

With its simplicity, flexibility, and the power of the Python ecosystem, Juturna is an excellent choice for rapidly prototyping and implementing sophisticated features in any WebRTC application. Because it’s both open-source and extensible, we are excited to see how the community will build upon it to create the next generation of intelligent, real-time communication tools.

Have an idea for a complex media workflow? The team at WebRTC.ventures is always exploring the cutting edge of what’s possible with live video and audio. Get in touch with our experts to discuss how we can bring your vision to life.


Recent Blog Posts