Our Blog

Edge Computing com Linux: Preparando Servidores para IoT e 5G

Edge Computing está transformando a forma como processamos dados e entregamos serviços em tempo real. Com o Linux como plataforma dominante para 68% dos dispositivos IoT e a expansão massiva do 5G, preparar servidores Linux para edge computing tornou-se uma competência essencial em 2025.

O Que é Edge Computing

Edge Computing move o processamento de dados para mais próximo de onde os dados são gerados, reduzindo latência e largura de banda necessária. Em ambientes Linux, isso significa:

  • Processamento Local: Dados processados no local onde são coletados
  • Latência Ultra-Baixa: Tempos de resposta de menos de 5ms
  • Autonomia: Funcionamento independente mesmo sem conexão com a nuvem
  • Escalabilidade Distribuída: Milhares de pontos de processamento

Configuração de Servidor Edge com Linux

Instalação Base Otimizada

#!/bin/bash
# edge-setup.sh - Setup completo para servidor edge

set -e

echo "=== Edge Computing Server Setup ==="

# Atualizar sistema
sudo apt update && sudo apt upgrade -y

# Instalar dependências essenciais
sudo apt install -y \
    docker.io \
    docker-compose \
    mosquitto \
    mosquitto-clients \
    redis-server \
    influxdb \
    grafana \
    nginx \
    fail2ban \
    ufw \
    python3-pip \
    git \
    htop \
    iotop

# Instalar bibliotecas Python para edge
pip3 install paho-mqtt influxdb redis tensorflow-lite opencv-python

# Configurar Docker para edge
sudo usermod -aG docker $USER
sudo systemctl enable docker
sudo systemctl start docker

# Configurar firewall otimizado para edge
sudo ufw allow ssh
sudo ufw allow 1883  # MQTT
sudo ufw allow 8086  # InfluxDB
sudo ufw allow 3000  # Grafana
sudo ufw allow 80    # HTTP
sudo ufw allow 443   # HTTPS
sudo ufw --force enable

# Otimizações de kernel para baixa latência
echo 'vm.swappiness=1' | sudo tee -a /etc/sysctl.conf
echo 'net.core.rmem_max = 16777216' | sudo tee -a /etc/sysctl.conf
echo 'net.core.wmem_max = 16777216' | sudo tee -a /etc/sysctl.conf
echo 'net.ipv4.tcp_congestion_control = bbr' | sudo tee -a /etc/sysctl.conf

echo "Edge server setup completed!"

Gateway IoT com MQTT

Configuração Mosquitto para Edge

# /etc/mosquitto/mosquitto.conf
pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information

# Listeners otimizados para edge
listener 1883 0.0.0.0
protocol mqtt

listener 8883 0.0.0.0
protocol mqtt
cafile /etc/ssl/certs/ca-certificates.crt

listener 9001 0.0.0.0
protocol websockets

# Performance para edge computing
max_connections 1000
max_inflight_messages 100
max_queued_messages 1000
message_size_limit 10240

# Bridge para cloud (quando disponível)
connection bridge-to-cloud
address cloud.empresa.com:8883
topic # out 0
topic # in 0
bridge_attempt_unsubscribe false
bridge_clean_session false
notifications false
try_private false

Coletor de Dados IoT em Python

#!/usr/bin/env python3
# edge-data-collector.py

import paho.mqtt.client as mqtt
import json
import time
import threading
from datetime import datetime
from influxdb import InfluxDBClient
import redis
import logging

class EdgeDataCollector:
    def __init__(self):
        self.mqtt_client = mqtt.Client()
        self.influxdb = InfluxDBClient(host='localhost', port=8086, database='edge_data')
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        
        # Configurar logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # Configurar MQTT callbacks
        self.mqtt_client.on_connect = self.on_connect
        self.mqtt_client.on_message = self.on_message
        
        # Buffer para processamento em lote
        self.data_buffer = []
        self.buffer_lock = threading.Lock()
        
        # Thread de processamento
        self.processing_thread = threading.Thread(target=self.process_data_buffer)
        self.processing_thread.daemon = True
        self.processing_thread.start()
    
    def on_connect(self, client, userdata, flags, rc):
        self.logger.info(f"Connected to MQTT broker with result code {rc}")
        
        # Subscribe para tópicos IoT
        topics = [
            ("sensors/+/temperature", 0),
            ("sensors/+/humidity", 0),
            ("devices/+/status", 0),
            ("cameras/+/detection", 0),
            ("vehicles/+/location", 0)
        ]
        
        for topic, qos in topics:
            client.subscribe(topic, qos)
    
    def on_message(self, client, userdata, msg):
        try:
            topic_parts = msg.topic.split('/')
            device_type = topic_parts[0]
            device_id = topic_parts[1]
            metric_type = topic_parts[2]
            
            payload = json.loads(msg.payload.decode())
            
            # Criar ponto de dados
            data_point = {
                "measurement": f"{device_type}_{metric_type}",
                "tags": {
                    "device_id": device_id,
                    "device_type": device_type,
                    "location": payload.get("location", "unknown")
                },
                "fields": {
                    "value": payload.get("value"),
                    "quality": payload.get("quality", 1.0)
                },
                "time": datetime.utcnow().isoformat()
            }
            
            # Adicionar ao buffer
            with self.buffer_lock:
                self.data_buffer.append(data_point)
            
            # Processar alertas críticos
            self.process_real_time_alerts(device_type, device_id, payload)
            
        except Exception as e:
            self.logger.error(f"Error processing message: {e}")
    
    def process_real_time_alerts(self, device_type, device_id, payload):
        """Alertas que requerem resposta < 100ms"""
        value = payload.get("value", 0)
        
        if device_type == "sensors":
            if "temperature" in payload and value > 80:
                self.send_critical_alert("HIGH_TEMPERATURE", device_id, value)
            elif "pressure" in payload and value > 100:
                self.send_critical_alert("HIGH_PRESSURE", device_id, value)
                
        elif device_type == "vehicles":
            speed = payload.get("speed", 0)
            if speed > 120:
                self.send_critical_alert("SPEEDING", device_id, speed)
    
    def send_critical_alert(self, alert_type, device_id, value):
        """Envia alerta crítico para resposta imediata"""
        alert = {
            "timestamp": datetime.utcnow().isoformat(),
            "type": alert_type,
            "device_id": device_id,
            "value": value,
            "severity": "CRITICAL",
            "requires_action": True
        }
        
        # Publicar para sistemas de resposta automática
        self.mqtt_client.publish("alerts/critical", json.dumps(alert), qos=2)
        
        # Cache para sistemas de monitoramento
        alert_key = f"alert:critical:{device_id}:{int(time.time())}"
        self.redis_client.setex(alert_key, 3600, json.dumps(alert))
        
        self.logger.critical(f"CRITICAL ALERT: {alert_type} for {device_id}: {value}")
    
    def process_data_buffer(self):
        """Processa dados em lote otimizado"""
        while True:
            time.sleep(2)  # Processar a cada 2 segundos para edge
            
            if not self.data_buffer:
                continue
                
            with self.buffer_lock:
                batch = self.data_buffer.copy()
                self.data_buffer.clear()
            
            if batch:
                try:
                    self.influxdb.write_points(batch, batch_size=1000)
                    self.logger.info(f"Processed {len(batch)} data points")
                except Exception as e:
                    self.logger.error(f"Error writing batch: {e}")
    
    def start(self):
        self.mqtt_client.connect("localhost", 1883, 60)
        self.mqtt_client.loop_forever()

if __name__ == "__main__":
    collector = EdgeDataCollector()
    collector.start()

Machine Learning na Edge

Inferência com TensorFlow Lite

#!/usr/bin/env python3
# edge-ml-processor.py

import tensorflow as tf
import numpy as np
import cv2
import time
from threading import Thread
import queue

class EdgeMLProcessor:
    def __init__(self, model_path):
        # Carregar modelo otimizado para edge
        self.interpreter = tf.lite.Interpreter(
            model_path=model_path,
            num_threads=4  # Otimizado para hardware edge
        )
        self.interpreter.allocate_tensors()
        
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        
        # Fila para processamento assíncrono
        self.inference_queue = queue.Queue(maxsize=50)
        self.result_queue = queue.Queue()
        
        # Thread pool para processamento paralelo
        self.workers = []
        for i in range(2):  # 2 workers para edge
            worker = Thread(target=self.process_inference_queue)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
    
    def run_inference(self, input_data):
        """Executa inferência otimizada"""
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        
        start_time = time.perf_counter()
        self.interpreter.invoke()
        inference_time = (time.perf_counter() - start_time) * 1000  # ms
        
        output_data = self.interpreter.get_tensor(self.output_details[0]['index'])
        
        return output_data, inference_time
    
    def process_inference_queue(self):
        """Worker thread para processar inferências"""
        while True:
            try:
                task = self.inference_queue.get(timeout=1)
                image_data, metadata, callback = task
                
                result, inference_time = self.run_inference(image_data)
                
                # Processar resultado
                prediction = {
                    'class_id': int(np.argmax(result)),
                    'confidence': float(np.max(result)),
                    'inference_time_ms': inference_time,
                    'timestamp': time.time(),
                    'metadata': metadata
                }
                
                if callback:
                    callback(prediction)
                else:
                    self.result_queue.put(prediction)
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Inference error: {e}")
    
    def predict_async(self, image_data, metadata=None, callback=None):
        """Inferência assíncrona não-bloqueante"""
        try:
            self.inference_queue.put_nowait((image_data, metadata, callback))
            return True
        except queue.Full:
            print("Inference queue full - dropping frame")
            return False

# Processador de câmera para detecção em tempo real
class EdgeCameraProcessor:
    def __init__(self, ml_processor, camera_id=0):
        self.ml_processor = ml_processor
        self.camera = cv2.VideoCapture(camera_id)
        
        # Configurar câmera para edge
        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        self.camera.set(cv2.CAP_PROP_FPS, 30)
        self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Reduzir buffer
        
        self.frame_count = 0
        self.detection_count = 0
        self.avg_latency = 0
    
    def process_video_stream(self):
        """Processa stream de vídeo otimizado para edge"""
        print("Starting edge camera processing...")
        
        while True:
            ret, frame = self.camera.read()
            if not ret:
                continue
            
            self.frame_count += 1
            
            # Processar apenas frames específicos para otimizar
            if self.frame_count % 3 != 0:  # Processa 1 a cada 3 frames
                continue
            
            # Pré-processar frame
            processed_frame = self.preprocess_frame(frame)
            
            # Metadados do frame
            metadata = {
                'timestamp': time.time(),
                'frame_number': self.frame_count,
                'camera_id': 0
            }
            
            # Enviar para inferência com callback
            self.ml_processor.predict_async(
                processed_frame, 
                metadata, 
                self.handle_detection
            )
    
    def preprocess_frame(self, frame):
        """Pré-processamento otimizado"""
        # Redimensionar para modelo
        frame = cv2.resize(frame, (224, 224))
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = np.expand_dims(frame, axis=0)
        frame = frame.astype(np.float32) / 255.0
        return frame
    
    def handle_detection(self, detection):
        """Processa detecções em tempo real"""
        self.detection_count += 1
        
        # Atualizar latência média
        self.avg_latency = (self.avg_latency * (self.detection_count - 1) + 
                           detection['inference_time_ms']) / self.detection_count
        
        # Processar apenas detecções com alta confiança
        if detection['confidence'] > 0.7:
            print(f"Detection {self.detection_count}: "
                  f"Class {detection['class_id']}, "
                  f"Confidence: {detection['confidence']:.2f}, "
                  f"Latency: {detection['inference_time_ms']:.1f}ms, "
                  f"Avg: {self.avg_latency:.1f}ms")
            
            # Enviar alertas para detecções críticas
            if detection['class_id'] in [1, 2, 15]:  # Pessoa, carro, gato
                self.send_detection_alert(detection)
    
    def send_detection_alert(self, detection):
        """Envia alerta de detecção via MQTT"""
        import paho.mqtt.client as mqtt
        
        client = mqtt.Client()
        client.connect("localhost", 1883, 60)
        
        alert = {
            'type': 'object_detection',
            'class_id': detection['class_id'],
            'confidence': detection['confidence'],
            'timestamp': detection['timestamp'],
            'camera_id': detection['metadata']['camera_id']
        }
        
        client.publish("alerts/detection", json.dumps(alert))
        client.disconnect()

# Exemplo de uso
if __name__ == "__main__":
    # Inicializar processador ML
    ml_processor = EdgeMLProcessor("/opt/models/mobilenet_v2.tflite")
    
    # Inicializar processador de câmera
    camera_processor = EdgeCameraProcessor(ml_processor)
    
    # Iniciar processamento
    camera_processor.process_video_stream()

Configuração para 5G e IoT

Configuração de Rede 5G

# /etc/netplan/01-5g-config.yaml
network:
  version: 2
  ethernets:
    eth0:
      dhcp4: true
      dhcp4-overrides:
        use-routes: false
      routes:
        - to: 10.0.0.0/8
          via: 192.168.1.1
          metric: 100
  
  modems:
    wwan0:
      apn: "5g.empresa.com"
      username: "edge-device"
      password: "password"
      pin: "1234"
      dhcp4: true
      routes:
        - to: 0.0.0.0/0
          via: 10.0.0.1
          metric: 200

Script de Monitoramento de Conectividade

#!/bin/bash
# connectivity-monitor.sh

LOG_FILE="/var/log/edge-connectivity.log"
INTERFACES=("eth0" "wwan0")
CRITICAL_SERVICES=("mosquitto" "docker" "influxdb")

log_event() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

check_interface_status() {
    local interface=$1
    
    if ip link show $interface &>/dev/null; then
        local status=$(cat /sys/class/net/$interface/operstate 2>/dev/null)
        local carrier=$(cat /sys/class/net/$interface/carrier 2>/dev/null)
        
        if [[ "$status" == "up" && "$carrier" == "1" ]]; then
            return 0
        fi
    fi
    return 1
}

check_internet_connectivity() {
    local targets=("8.8.8.8" "1.1.1.1")
    
    for target in "${targets[@]}"; do
        if ping -c 1 -W 2 $target &>/dev/null; then
            return 0
        fi
    done
    return 1
}

measure_latency() {
    local target="8.8.8.8"
    local latency=$(ping -c 5 $target 2>/dev/null | tail -1 | awk -F '/' '{print $5}')
    
    if [[ -n "$latency" ]]; then
        echo $latency
    else
        echo "999"
    fi
}

check_service_status() {
    local service=$1
    
    if systemctl is-active --quiet $service; then
        return 0
    else
        log_event "WARNING: Service $service is not running"
        systemctl restart $service
        sleep 5
        
        if systemctl is-active --quiet $service; then
            log_event "INFO: Service $service restarted successfully"
            return 0
        else
            log_event "ERROR: Failed to restart service $service"
            return 1
        fi
    fi
}

# Loop principal
while true; do
    log_event "=== Edge Connectivity Check ==="
    
    # Verificar interfaces de rede
    active_interfaces=0
    for interface in "${INTERFACES[@]}"; do
        if check_interface_status $interface; then
            log_event "OK: Interface $interface is up"
            active_interfaces=$((active_interfaces + 1))
        else
            log_event "ERROR: Interface $interface is down"
        fi
    done
    
    # Verificar conectividade à internet
    if check_internet_connectivity; then
        latency=$(measure_latency)
        log_event "OK: Internet connectivity (latency: ${latency}ms)"
        
        if (( $(echo "$latency > 100" | bc -l) )); then
            log_event "WARNING: High latency detected: ${latency}ms"
        fi
    else
        log_event "ERROR: No internet connectivity"
        
        # Tentar recuperação automática
        if [[ $active_interfaces -eq 0 ]]; then
            log_event "Attempting network recovery..."
            systemctl restart NetworkManager
            sleep 15
        fi
    fi
    
    # Verificar serviços críticos
    for service in "${CRITICAL_SERVICES[@]}"; do
        check_service_status $service
    done
    
    # Verificar uso de CPU e memória
    cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
    mem_usage=$(free | grep Mem | awk '{printf "%.1f", $3/$2 * 100.0}')
    
    if (( $(echo "$cpu_usage > 80" | bc -l) )); then
        log_event "WARNING: High CPU usage: ${cpu_usage}%"
    fi
    
    if (( $(echo "$mem_usage > 85" | bc -l) )); then
        log_event "WARNING: High memory usage: ${mem_usage}%"
    fi
    
    sleep 30
done

Casos de Uso Práticos

1. Smart Factory – Monitoramento Industrial

Configuração para linha de produção:

  • Sensores: Temperatura, vibração, pressão, qualidade
  • Processamento: Detecção de anomalias < 50ms
  • Ações: Parada automática, alertas, ajustes
  • Conectividade: 5G privado + Ethernet redundante

2. Smart City – Gestão de Tráfego

# Configuração Docker para controle de tráfego
version: '3.8'
services:
  traffic-controller:
    image: traffic-control:latest
    restart: always
    environment:
      - INTERSECTION_ID=MAIN_ST_5TH_AVE
      - MQTT_BROKER=localhost:1883
      - ML_MODEL_PATH=/models/traffic-prediction.tflite
      - MAX_LATENCY_MS=50
    volumes:
      - /opt/edge/models:/models:ro
      - /opt/edge/config:/config:ro
    devices:
      - /dev/video0:/dev/video0  # Câmera de tráfego
    network_mode: host
    
  camera-processor:
    image: opencv-processor:latest
    restart: always
    environment:
      - CAMERA_COUNT=4
      - DETECTION_THRESHOLD=0.8
      - PROCESS_EVERY_NTH_FRAME=3
    devices:
      - /dev/video0:/dev/video0
      - /dev/video1:/dev/video1
      - /dev/video2:/dev/video2
      - /dev/video3:/dev/video3
    volumes:
      - /opt/edge/models:/models:ro

3. Autonomous Vehicles – Edge Processing

Sistema de processamento para veículos autônomos:

  • Sensores: LiDAR, câmeras, radar, GPS
  • Processamento: Fusão de sensores, planejamento de rota
  • Latência: < 10ms para decisões críticas
  • Redundância: Múltiplos processadores edge

Monitoramento e Métricas

Dashboard Grafana para Edge

# Configuração Prometheus para edge computing
# prometheus-edge.yml
global:
  scrape_interval: 5s
  evaluation_interval: 5s

scrape_configs:
  - job_name: 'edge-node'
    static_configs:
      - targets: ['localhost:9100']
    metrics_path: /metrics
    scrape_interval: 5s
    
  - job_name: 'mqtt-metrics'
    static_configs:
      - targets: ['localhost:9234']
      
  - job_name: 'container-metrics'
    static_configs:
      - targets: ['localhost:8080']
      
  - job_name: 'custom-edge-metrics'
    static_configs:
      - targets: ['localhost:8888']

rule_files:
  - "edge-alerts.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['localhost:9093']

Otimizações de Performance

Configurações do Kernel para Edge

# /etc/sysctl.d/99-edge-performance.conf

# Otimizações de rede para baixa latência
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
net.core.netdev_max_backlog = 5000

# TCP otimizado para edge
net.ipv4.tcp_congestion_control = bbr
net.ipv4.tcp_rmem = 4096 65536 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216

# Reduzir latência de rede
net.ipv4.tcp_low_latency = 1
net.ipv4.tcp_timestamps = 0

# Otimizações de CPU
kernel.sched_migration_cost_ns = 500000
kernel.sched_min_granularity_ns = 10000000

# Gerenciamento de memória
vm.swappiness = 1
vm.dirty_ratio = 15
vm.dirty_background_ratio = 5

Segurança em Edge Computing

Configuração de Segurança Específica

# Configuração fail2ban para edge
# /etc/fail2ban/jail.d/edge-security.conf
[DEFAULT]
bantime = 3600
findtime = 600
maxretry = 3

[sshd]
enabled = true
port = ssh
filter = sshd
logpath = /var/log/auth.log
maxretry = 3

[mqtt-brute]
enabled = true
port = 1883
filter = mqtt-auth
logpath = /var/log/mosquitto/mosquitto.log
maxretry = 5

[http-flood]
enabled = true
port = http,https
filter = http-flood
logpath = /var/log/nginx/access.log
maxretry = 100
findtime = 60

Conclusão

Edge Computing com Linux representa o futuro do processamento distribuído e inteligente. Em 2025, a capacidade de processar dados localmente com latência mínima tornou-se diferencial competitivo crucial.

As configurações e scripts apresentados oferecem base sólida para implementar edge computing em diferentes cenários, desde IoT industrial até veículos autônomos. A combinação de Linux otimizado, MQTT para comunicação, machine learning local e monitoramento em tempo real cria infraestrutura robusta e eficiente.

Comece implementando um piloto com os componentes básicos – MQTT broker, coleta de dados e processamento local – e expanda gradualmente conforme suas necessidades específicas. O futuro está na borda da rede, e o Linux é a plataforma que vai levar você até lá.

Facebook
Twitter
LinkedIn