Design Parking Lot System

Designing a smart parking lot system at Meta scale means handling thousands of parking facilities, millions of vehicles, real-time spot availability, automated entry/exit with license plate recognition, dynamic pricing, and seamless payment processing. This system needs to support high throughput, low latency updates, IoT sensor integration, and mobile app synchronization.


Step 1: Requirements and Scale Estimation

Functional Requirements

Core Parking Operations:

  • Real-time spot availability tracking: Monitor occupancy using IoT sensors and cameras across multiple floors and zones
  • Entry/exit management: Automated gate control with license plate recognition (LPR) and ticket generation
  • Vehicle type support: Handle different vehicle categories (compact, sedan, SUV, motorcycle, electric vehicle)
  • Spot allocation algorithm: Intelligent assignment based on proximity, vehicle size, and user preferences
  • Payment processing: Calculate fees based on duration, vehicle type, and dynamic pricing
  • Reservation system: Allow users to pre-book spots with holds and guarantees

Advanced Features:

  • Multi-level parking representation: Support complex floor layouts with zones and sections
  • Mobile app integration: Real-time availability, navigation, and remote payment
  • Admin dashboard: Monitoring, analytics, and configuration management
  • Access control: Permit management for monthly subscribers and VIP users
  • Notifications: Entry confirmation, payment reminders, and overstay alerts

Non-Functional Requirements

Performance:

  • Entry/exit latency: < 2 seconds for gate opening (LPR + verification)
  • Availability update latency: < 500ms from sensor trigger to app update
  • Payment processing: < 3 seconds end-to-end
  • Mobile app sync: Real-time spot availability with < 1 second staleness

Scalability:

  • Support 10,000 parking facilities globally
  • 500,000 parking spots across all facilities
  • 5 million daily entry/exit transactions
  • 50 million spot availability checks per day
  • 200,000 concurrent mobile app users

Reliability:

  • 99.99% uptime for core parking operations (entry/exit must always work)
  • Data durability: Zero loss of payment and transaction records
  • Graceful degradation: Manual ticketing fallback if LPR fails
  • Eventual consistency: Spot availability can be eventually consistent (< 1 second)

Security:

  • PCI DSS compliance for payment data
  • Encrypted vehicle and license plate data
  • Access control with role-based permissions
  • Audit logs for all financial transactions

Scale Estimation

Storage:

  • Parking spots: 500K spots × 1KB = 500MB
  • Facilities metadata: 10K facilities × 10KB = 100MB
  • Daily transactions: 5M × 2KB = 10GB/day → 3.6TB/year
  • Sensor data (compressed): 500K spots × 100 readings/day × 100 bytes = 5GB/day
  • License plate images: 5M × 200KB × 30 days retention = 30TB (S3)
  • Active sessions (Redis): 100K concurrent × 5KB = 500MB

Bandwidth:

  • Entry/exit transactions: 5M/day ÷ 86400 = ~58 TPS (peak 500 TPS)
  • Spot availability updates: 50M/day ÷ 86400 = ~580 QPS (peak 5000 QPS)
  • Mobile app WebSocket connections: 200K concurrent, 1KB/sec = 200MB/s
  • LPR image processing: 500 TPS × 200KB = 100MB/s peak

Step 2: High-Level Design

System Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         Mobile Apps / Web                        │
│              (Real-time availability, Payment, Nav)              │
└────────────┬────────────────────────────────────────────────────┘

             ├─── REST API
             └─── WebSocket (real-time updates)

┌────────────▼─────────────────────────────────────────────────────┐
│                        API Gateway (Kong)                         │
│            Authentication, Rate Limiting, Routing                 │
└────────┬─────────────┬──────────────┬─────────────┬──────────────┘
         │             │              │             │
    ┌────▼────┐  ┌────▼────┐   ┌─────▼─────┐ ┌────▼──────┐
    │  Spot   │  │  Entry  │   │  Payment  │ │Reservation│
    │ Service │  │  /Exit  │   │  Service  │ │  Service  │
    │         │  │ Service │   │           │ │           │
    └────┬────┘  └────┬────┘   └─────┬─────┘ └────┬──────┘
         │            │              │            │
         │            │              │            │
    ┌────▼────────────▼──────────────▼────────────▼──────┐
    │              Redis Cluster (Cache)                  │
    │   - Spot availability (sorted sets by floor/zone)   │
    │   - Active sessions (hash maps)                     │
    │   - Reservation locks (distributed locks)           │
    └─────────────────────────────────────────────────────┘
         │            │              │            │
    ┌────▼────┐  ┌───▼──────┐  ┌────▼─────┐ ┌───▼──────┐
    │PostgreSQL│  │PostgreSQL│  │PostgreSQL│ │PostgreSQL│
    │  Spots  │  │  Parking │  │ Payments │ │ Reserved │
    │  DB     │  │ Sessions │  │    DB    │ │   DB     │
    └─────────┘  └────┬─────┘  └──────────┘ └──────────┘

    ┌─────────────────▼──────────────────────────────────┐
    │                Kafka Event Stream                   │
    │  - parking.entry.events                            │
    │  - parking.exit.events                             │
    │  - parking.spot.availability                       │
    │  - parking.payment.completed                       │
    └────┬────────────┬──────────────┬───────────────────┘
         │            │              │
    ┌────▼─────┐ ┌───▼─────┐  ┌─────▼──────┐
    │Analytics │ │  LPR    │  │ Monitoring │
    │ Service  │ │ Service │  │  Service   │
    │(Flink)   │ │  (ML)   │  │(Prometheus)│
    └──────────┘ └───┬─────┘  └────────────┘

    ┌────────────────▼──────────────────────────────────┐
    │          IoT Layer (Edge Computing)               │
    │                                                   │
    │  ┌───────────┐  ┌──────────┐  ┌──────────┐     │
    │  │  Sensors  │  │  Cameras │  │   Gate   │     │
    │  │(Occupancy)│  │   (LPR)  │  │ Controllers│    │
    │  └───────────┘  └──────────┘  └──────────┘     │
    └───────────────────────────────────────────────────┘

Core Services

1. Spot Availability Service

  • Manages real-time parking spot status across all facilities
  • Integrates with IoT sensors (ultrasonic, magnetic, camera-based)
  • Updates Redis cache with millisecond latency
  • Handles spot allocation algorithms
  • Provides WebSocket updates to mobile clients

2. Entry/Exit Service

  • Controls automated gate operations
  • Integrates with License Plate Recognition (LPR) system
  • Generates parking tickets and sessions
  • Validates reservations and permits
  • Manages entry/exit queues during peak times

3. Payment Service

  • Calculates parking fees based on duration and rules
  • Processes payments via multiple providers (Stripe, PayPal)
  • Handles dynamic pricing (surge, events, time-based)
  • Manages prepaid accounts and monthly subscriptions
  • Generates receipts and refunds

4. Reservation Service

  • Allows pre-booking of spots up to 30 days in advance
  • Implements distributed locking to prevent double-booking
  • Manages no-show policies and cancellations
  • Handles hold periods (15 min grace)
  • Supports group reservations for events

5. Monitoring & Analytics Service

  • Tracks facility utilization and revenue
  • Detects anomalies (sensor failures, overstays)
  • Provides predictive analytics for demand forecasting
  • Monitors system health and performance
  • Generates reports for facility managers

Step 3: Deep Dives

3.1 Real-Time Spot Availability Tracking

IoT Sensor Integration:

# Edge device sensor aggregator (deployed at each facility)
class SensorAggregator:
    def __init__(self, facility_id, mqtt_broker):
        self.facility_id = facility_id
        self.mqtt_client = mqtt.Client()
        self.kafka_producer = KafkaProducer('parking-events')

    def process_sensor_event(self, spot_id, occupied, confidence):
        """
        Sensors: Ultrasonic (95% accuracy), Magnetic (85%), Camera (98%)
        Use sensor fusion for higher confidence
        """
        event = {
            'facility_id': self.facility_id,
            'spot_id': spot_id,
            'occupied': occupied,
            'confidence': confidence,
            'timestamp': time.time(),
            'sensor_type': 'ultrasonic'  # or 'camera', 'magnetic'
        }

        # Filter false positives with confidence threshold
        if confidence > 0.85:
            self.kafka_producer.send('parking.spot.events', event)

    def handle_camera_feed(self, zone_id, frame):
        """
        Use computer vision to detect vehicles in multiple spots
        More accurate but higher latency than dedicated sensors
        """
        occupied_spots = self.detect_vehicles(frame)  # ML model
        for spot_id, confidence in occupied_spots.items():
            self.process_sensor_event(spot_id, True, confidence)

Spot Availability Service (Real-time updates):

class SpotAvailabilityService:
    def __init__(self):
        self.redis = RedisCluster(nodes=['redis-1', 'redis-2', 'redis-3'])
        self.postgres = PostgresConnection('spots_db')
        self.kafka_consumer = KafkaConsumer('parking.spot.events')
        self.websocket_manager = WebSocketManager()

    async def consume_sensor_events(self):
        """
        Process sensor events and update Redis in real-time
        """
        async for event in self.kafka_consumer:
            spot_id = event['spot_id']
            facility_id = event['facility_id']
            occupied = event['occupied']

            # Update Redis with pipeline for atomicity
            pipe = self.redis.pipeline()

            # Store spot status: HASH for spot metadata
            pipe.hset(f"spot:{spot_id}", mapping={
                'occupied': occupied,
                'updated_at': event['timestamp'],
                'vehicle_type': self.detect_vehicle_type(event)
            })

            # Update available spots per floor: SORTED SET
            floor = self.get_floor(spot_id)
            if not occupied:
                # Add to available spots (score = distance from entrance)
                pipe.zadd(
                    f"available:{facility_id}:{floor}",
                    {spot_id: self.get_distance_score(spot_id)}
                )
            else:
                pipe.zrem(f"available:{facility_id}:{floor}", spot_id)

            # Update facility-wide counter
            if occupied:
                pipe.hincrby(f"facility:{facility_id}", 'occupied_count', 1)
                pipe.hincrby(f"facility:{facility_id}", 'available_count', -1)
            else:
                pipe.hincrby(f"facility:{facility_id}", 'occupied_count', -1)
                pipe.hincrby(f"facility:{facility_id}", 'available_count', 1)

            await pipe.execute()

            # Publish to WebSocket subscribers
            await self.websocket_manager.broadcast(
                facility_id,
                {'type': 'spot_update', 'spot_id': spot_id, 'occupied': occupied}
            )

            # Persist to PostgreSQL asynchronously
            await self.persist_spot_event(event)

    async def get_available_spots(self, facility_id, floor, vehicle_type):
        """
        Get available spots sorted by proximity to entrance
        """
        # Redis ZRANGE: O(log(N)+M) where M is number of results
        available = await self.redis.zrange(
            f"available:{facility_id}:{floor}",
            0, 50  # Top 50 closest spots
        )

        # Filter by vehicle type compatibility
        filtered = []
        for spot_id in available:
            spot_info = await self.redis.hgetall(f"spot:{spot_id}")
            if self.is_compatible(spot_info['type'], vehicle_type):
                filtered.append(spot_id)

        return filtered[:10]  # Return top 10

Redis Data Structure:

Key: spot:{spot_id}
Type: HASH
{
    "occupied": "0",
    "type": "compact",  // compact, sedan, suv, ev, motorcycle
    "floor": "2",
    "zone": "A",
    "distance_from_entrance": "150",  // meters
    "ev_charger": "1",
    "updated_at": "1706284800.123"
}

Key: available:{facility_id}:{floor}
Type: SORTED SET (score = distance from entrance)
{
    "spot_2A_001": 50.0,   // 50 meters from entrance
    "spot_2A_002": 52.3,
    "spot_2A_015": 75.8
}

Key: facility:{facility_id}
Type: HASH
{
    "total_spots": "500",
    "occupied_count": "387",
    "available_count": "113",
    "ev_available": "8"
}

3.2 Entry/Exit Gate Control with License Plate Recognition

LPR Service with ML:

class LicensePlateRecognitionService:
    def __init__(self):
        # Use pre-trained model (e.g., YOLO for detection, OCR for reading)
        self.detector = YOLOv8('license_plate_detector.pt')
        self.ocr = EasyOCR(['en'])
        self.s3 = S3Client('lpr-images')
        self.redis = RedisCluster()

    async def process_entry(self, camera_id, image_bytes):
        """
        Process entry camera image:
        1. Detect license plate
        2. Extract text via OCR
        3. Verify against database
        4. Open gate if authorized
        """
        start_time = time.time()

        # Step 1: Detect plate region (50-100ms)
        plate_regions = self.detector.predict(image_bytes)
        if not plate_regions:
            return {'success': False, 'error': 'NO_PLATE_DETECTED'}

        # Step 2: OCR on plate region (100-200ms)
        plate_image = self.extract_plate(image_bytes, plate_regions[0])
        plate_text = self.ocr.readtext(plate_image)
        plate_number = self.normalize_plate(plate_text)

        # Step 3: Check for reservation or permit (Redis lookup, <10ms)
        vehicle_info = await self.redis.hgetall(f"vehicle:{plate_number}")

        if not vehicle_info:
            # Check PostgreSQL for monthly permit
            vehicle_info = await self.db.query(
                "SELECT * FROM permits WHERE plate_number = $1 AND active = true",
                plate_number
            )

        # Step 4: Generate parking session
        session_id = self.generate_session_id()
        session = {
            'session_id': session_id,
            'plate_number': plate_number,
            'facility_id': camera_id.facility_id,
            'entry_time': time.time(),
            'vehicle_type': vehicle_info.get('vehicle_type', 'sedan'),
            'has_reservation': bool(vehicle_info.get('reservation_id'))
        }

        # Store session in Redis (fast access during exit)
        await self.redis.setex(
            f"session:{session_id}",
            86400,  # 24 hour TTL
            json.dumps(session)
        )

        # Store in PostgreSQL for durability
        await self.db.insert('parking_sessions', session)

        # Upload image to S3 for audit trail
        await self.s3.upload_async(
            f"entries/{session_id}.jpg",
            image_bytes,
            metadata={'plate': plate_number, 'timestamp': session['entry_time']}
        )

        # Publish entry event
        await self.kafka_producer.send('parking.entry.events', session)

        latency = time.time() - start_time
        logger.info(f"LPR processing completed in {latency*1000:.0f}ms")

        return {
            'success': True,
            'session_id': session_id,
            'plate_number': plate_number,
            'gate_action': 'OPEN'
        }

    def normalize_plate(self, ocr_result):
        """
        Normalize OCR output: remove spaces, handle common OCR errors
        Example: "ABC 1234" -> "ABC1234", "O" -> "0" in number context
        """
        plate = ''.join(ocr_result).upper().replace(' ', '')
        # Apply state-specific formatting rules
        return plate

Entry/Exit Service:

class EntryExitService:
    def __init__(self):
        self.lpr_service = LicensePlateRecognitionService()
        self.spot_service = SpotAvailabilityService()
        self.gate_controller = GateController()

    async def handle_entry(self, facility_id, gate_id, image):
        """
        Entry workflow:
        1. LPR recognition
        2. Check availability
        3. Allocate spot
        4. Open gate
        5. Generate ticket
        """
        # Step 1: Recognize plate
        lpr_result = await self.lpr_service.process_entry(gate_id, image)
        if not lpr_result['success']:
            await self.gate_controller.display_message(gate_id, "Please position vehicle")
            return

        # Step 2: Check if facility has capacity
        capacity = await self.spot_service.get_available_count(facility_id)
        if capacity == 0:
            await self.gate_controller.display_message(gate_id, "PARKING FULL")
            return

        # Step 3: Allocate spot using allocation algorithm
        vehicle_type = lpr_result.get('vehicle_type', 'sedan')
        allocated_spot = await self.allocate_spot(
            facility_id,
            vehicle_type,
            has_reservation=lpr_result.get('has_reservation', False)
        )

        # Step 4: Open gate (send command to IoT controller)
        await self.gate_controller.open_gate(gate_id, duration=10)

        # Step 5: Display info and generate ticket
        await self.gate_controller.display_message(
            gate_id,
            f"Welcome! Spot: {allocated_spot}\nFloor: {self.get_floor(allocated_spot)}"
        )

        # Print physical ticket (optional, mostly mobile now)
        ticket = {
            'session_id': lpr_result['session_id'],
            'entry_time': datetime.now(),
            'allocated_spot': allocated_spot
        }
        await self.gate_controller.print_ticket(gate_id, ticket)

        # Send push notification to mobile app
        await self.notify_user(lpr_result['plate_number'], ticket)

    async def handle_exit(self, facility_id, gate_id, image):
        """
        Exit workflow:
        1. LPR recognition
        2. Lookup session
        3. Calculate payment
        4. Verify payment
        5. Open gate
        6. Free spot
        """
        lpr_result = await self.lpr_service.process_entry(gate_id, image)
        plate_number = lpr_result['plate_number']

        # Lookup active session
        session = await self.redis.get(f"active:{plate_number}")
        if not session:
            await self.gate_controller.display_message(gate_id, "No active session found")
            return

        # Calculate parking fee
        duration = time.time() - session['entry_time']
        fee = await self.payment_service.calculate_fee(facility_id, duration, session['vehicle_type'])

        # Check if already paid (via app or prepaid account)
        payment_status = await self.payment_service.get_payment_status(session['session_id'])

        if payment_status != 'PAID':
            # Display amount and payment options
            await self.gate_controller.display_payment(gate_id, fee)
            # Wait for payment (card reader at gate)
            payment = await self.gate_controller.wait_for_payment(gate_id, timeout=120)
            if not payment:
                return

        # Open gate
        await self.gate_controller.open_gate(gate_id, duration=10)

        # Mark session as completed
        await self.complete_session(session['session_id'])

        # Free the spot
        await self.spot_service.free_spot(session['allocated_spot'])

        # Publish exit event
        await self.kafka_producer.send('parking.exit.events', {
            'session_id': session['session_id'],
            'exit_time': time.time(),
            'duration': duration,
            'fee': fee
        })

3.3 Parking Spot Allocation Algorithm

Smart Allocation Strategy:

class SpotAllocationService:
    def __init__(self):
        self.redis = RedisCluster()

    async def allocate_spot(self, facility_id, vehicle_type, has_reservation=False, preferences=None):
        """
        Multi-criteria allocation algorithm:
        1. Reservation: Return reserved spot
        2. Vehicle size matching: Compact car -> compact spot (don't waste large spots)
        3. Proximity: Nearest to entrance/elevator
        4. Special requirements: EV charger, handicap, etc.
        5. Load balancing: Distribute across floors to avoid congestion
        """
        if has_reservation:
            return await self.get_reserved_spot(facility_id, vehicle_type)

        # Get all available spots for this vehicle type
        floors = await self.get_floors(facility_id)
        candidates = []

        for floor in floors:
            # Get available spots sorted by distance
            spots = await self.redis.zrange(
                f"available:{facility_id}:{floor}",
                0, -1,
                withscores=True  # Get distance scores
            )

            for spot_id, distance in spots:
                spot_info = await self.redis.hgetall(f"spot:{spot_id}")

                # Filter by vehicle compatibility
                if not self.is_size_compatible(spot_info['type'], vehicle_type):
                    continue

                # Check special requirements
                if preferences:
                    if preferences.get('ev_charger') and spot_info.get('ev_charger') != '1':
                        continue
                    if preferences.get('handicap') and spot_info.get('handicap') != '1':
                        continue

                # Calculate score based on multiple factors
                score = self.calculate_allocation_score(
                    distance=distance,
                    floor=floor,
                    spot_info=spot_info,
                    vehicle_type=vehicle_type,
                    preferences=preferences
                )

                candidates.append((spot_id, score))

        if not candidates:
            raise NoSpotAvailableException()

        # Sort by score and pick best spot
        candidates.sort(key=lambda x: x[1], reverse=True)
        best_spot = candidates[0][0]

        # Reserve spot with distributed lock (prevent race condition)
        lock_acquired = await self.redis.set(
            f"lock:spot:{best_spot}",
            "RESERVED",
            nx=True,  # Only set if not exists
            ex=300    # 5 minute expiry
        )

        if not lock_acquired:
            # Spot taken by another vehicle, try next best
            return await self.allocate_spot(facility_id, vehicle_type, has_reservation, preferences)

        # Mark spot as occupied
        await self.redis.hset(f"spot:{best_spot}", 'occupied', '1')
        await self.redis.zrem(f"available:{facility_id}:{floor}", best_spot)

        return best_spot

    def calculate_allocation_score(self, distance, floor, spot_info, vehicle_type, preferences):
        """
        Weighted scoring algorithm:
        - Distance: 40% weight (closer is better)
        - Floor: 30% weight (lower floors preferred for quick access)
        - Size match: 20% weight (exact match preferred to save large spots)
        - Special features: 10% weight (EV charger, covered, etc.)
        """
        score = 0

        # Distance score (inverse: lower distance = higher score)
        max_distance = 200  # meters
        score += (1 - distance / max_distance) * 0.4

        # Floor score (ground floor = best)
        floor_num = int(floor)
        score += (1 - floor_num / 10) * 0.3

        # Size match score
        if spot_info['type'] == vehicle_type:
            score += 0.2  # Exact match
        elif self.is_size_compatible(spot_info['type'], vehicle_type):
            score += 0.1  # Compatible but not ideal

        # Special features
        if preferences:
            if preferences.get('ev_charger') and spot_info.get('ev_charger') == '1':
                score += 0.1
            if preferences.get('covered') and spot_info.get('covered') == '1':
                score += 0.05

        return score

    def is_size_compatible(self, spot_type, vehicle_type):
        """
        Size compatibility matrix:
        Compact spot: motorcycle, compact only
        Sedan spot: motorcycle, compact, sedan
        SUV spot: all vehicle types
        """
        compatibility = {
            'motorcycle': ['motorcycle', 'compact', 'sedan', 'suv'],
            'compact': ['compact', 'sedan', 'suv'],
            'sedan': ['sedan', 'suv'],
            'suv': ['suv']
        }
        return spot_type in compatibility.get(vehicle_type, [])

3.4 Payment Calculation with Dynamic Pricing

Payment Service:

class PaymentService:
    def __init__(self):
        self.postgres = PostgresConnection('payments_db')
        self.redis = RedisCluster()
        self.stripe = StripeClient()

    async def calculate_fee(self, facility_id, duration_seconds, vehicle_type):
        """
        Dynamic pricing algorithm:
        1. Base rate (per hour, varies by facility and location)
        2. Vehicle type multiplier
        3. Time-based pricing (peak hours cost more)
        4. Event-based surge pricing
        5. Maximum daily cap
        """
        hours = duration_seconds / 3600

        # Get base rate from Redis cache (fallback to DB)
        rate_key = f"rate:{facility_id}:{datetime.now().hour}"
        base_rate = await self.redis.get(rate_key)

        if not base_rate:
            pricing_rules = await self.postgres.query(
                "SELECT * FROM pricing_rules WHERE facility_id = $1",
                facility_id
            )
            base_rate = pricing_rules['hourly_rate']
            await self.redis.setex(rate_key, 3600, base_rate)

        base_rate = float(base_rate)

        # Vehicle type multiplier
        vehicle_multipliers = {
            'motorcycle': 0.5,
            'compact': 1.0,
            'sedan': 1.2,
            'suv': 1.5,
            'ev': 1.3  # Includes charging cost
        }
        multiplier = vehicle_multipliers.get(vehicle_type, 1.0)

        # Time-based pricing (peak hours)
        hour = datetime.now().hour
        if 7 <= hour <= 10 or 17 <= hour <= 20:  # Rush hours
            multiplier *= 1.5
        elif 22 <= hour or hour <= 6:  # Night discount
            multiplier *= 0.8

        # Event-based surge (check if special event today)
        surge = await self.get_surge_multiplier(facility_id, datetime.now())
        multiplier *= surge

        # Calculate total
        total = base_rate * hours * multiplier

        # Apply maximum daily cap
        daily_max = 50.0  # $50 max per day
        total = min(total, daily_max)

        # Minimum charge (15 minutes minimum)
        min_charge = base_rate * 0.25 * multiplier
        total = max(total, min_charge)

        return round(total, 2)

    async def process_payment(self, session_id, amount, payment_method):
        """
        Process payment via Stripe with retry logic
        """
        payment_intent = await self.stripe.create_payment_intent(
            amount=int(amount * 100),  # Cents
            currency='usd',
            metadata={'session_id': session_id}
        )

        # Store payment record
        payment_record = {
            'session_id': session_id,
            'amount': amount,
            'currency': 'USD',
            'status': 'PENDING',
            'payment_intent_id': payment_intent.id,
            'created_at': datetime.now()
        }

        await self.postgres.insert('payments', payment_record)

        # Confirm payment
        try:
            result = await self.stripe.confirm_payment(
                payment_intent.id,
                payment_method=payment_method
            )

            if result.status == 'succeeded':
                await self.postgres.update(
                    'payments',
                    {'status': 'PAID'},
                    {'payment_intent_id': payment_intent.id}
                )

                # Publish payment event
                await self.kafka_producer.send('parking.payment.completed', {
                    'session_id': session_id,
                    'amount': amount,
                    'timestamp': time.time()
                })

                return {'success': True, 'transaction_id': payment_intent.id}
        except StripeError as e:
            logger.error(f"Payment failed: {e}")
            await self.postgres.update(
                'payments',
                {'status': 'FAILED', 'error': str(e)},
                {'payment_intent_id': payment_intent.id}
            )
            return {'success': False, 'error': str(e)}

    async def get_surge_multiplier(self, facility_id, date):
        """
        Check for special events (concerts, sports games) near facility
        """
        events = await self.postgres.query(
            """
            SELECT surge_multiplier FROM events
            WHERE facility_id = $1
            AND event_date = $2
            AND active = true
            """,
            facility_id, date.date()
        )

        if events:
            return events[0]['surge_multiplier']
        return 1.0

3.5 Reservation System with Distributed Locking

Reservation Service:

class ReservationService:
    def __init__(self):
        self.postgres = PostgresConnection('reservations_db')
        self.redis = RedisCluster()

    async def create_reservation(self, user_id, facility_id, start_time, duration, vehicle_type, preferences=None):
        """
        Create parking spot reservation with distributed locking
        """
        # Validate start time (up to 30 days in advance)
        if start_time > datetime.now() + timedelta(days=30):
            raise ValidationError("Cannot reserve more than 30 days in advance")

        # Check availability for requested time slot
        available = await self.check_availability(
            facility_id, start_time, duration, vehicle_type
        )

        if not available:
            raise NoAvailabilityError("No spots available for requested time")

        # Use Redlock algorithm for distributed locking
        lock_key = f"lock:reservation:{facility_id}:{start_time.isoformat()}"
        lock = RedLock([self.redis], lock_key, ttl=5000)  # 5 second lock

        if not await lock.acquire():
            raise ConcurrencyError("Unable to acquire lock, please try again")

        try:
            # Double-check availability within lock
            available = await self.check_availability(
                facility_id, start_time, duration, vehicle_type
            )

            if not available:
                raise NoAvailabilityError("Spot taken by another reservation")

            # Find best spot for reservation
            allocated_spot = await self.find_reservable_spot(
                facility_id, start_time, duration, vehicle_type, preferences
            )

            # Create reservation record
            reservation_id = self.generate_reservation_id()
            reservation = {
                'reservation_id': reservation_id,
                'user_id': user_id,
                'facility_id': facility_id,
                'spot_id': allocated_spot,
                'start_time': start_time,
                'end_time': start_time + timedelta(seconds=duration),
                'vehicle_type': vehicle_type,
                'status': 'CONFIRMED',
                'created_at': datetime.now()
            }

            await self.postgres.insert('reservations', reservation)

            # Cache in Redis for fast lookup during entry
            await self.redis.setex(
                f"reservation:{reservation_id}",
                duration + 3600,  # +1 hour buffer
                json.dumps(reservation)
            )

            # Block spot in availability system
            await self.redis.sadd(
                f"reserved_spots:{facility_id}:{start_time.date()}",
                allocated_spot
            )

            # Send confirmation
            await self.send_confirmation(user_id, reservation)

            return reservation
        finally:
            await lock.release()

    async def check_in_reservation(self, reservation_id, plate_number):
        """
        Activate reservation when vehicle arrives (grace period: 15 min)
        """
        reservation = await self.redis.get(f"reservation:{reservation_id}")

        if not reservation:
            reservation = await self.postgres.query(
                "SELECT * FROM reservations WHERE reservation_id = $1",
                reservation_id
            )

        # Check if within grace period
        now = datetime.now()
        start_time = reservation['start_time']
        grace_period = timedelta(minutes=15)

        if now < start_time - grace_period:
            raise ValidationError("Too early for check-in")

        if now > start_time + grace_period:
            # No-show: cancel reservation and charge fee
            await self.handle_no_show(reservation_id)
            raise ValidationError("Reservation expired (no-show)")

        # Activate reservation
        await self.postgres.update(
            'reservations',
            {'status': 'ACTIVE', 'plate_number': plate_number, 'check_in_time': now},
            {'reservation_id': reservation_id}
        )

        return reservation

    async def handle_no_show(self, reservation_id):
        """
        Charge no-show fee (typically 20% of estimated parking cost)
        """
        reservation = await self.get_reservation(reservation_id)

        # Calculate no-show fee
        estimated_cost = await self.payment_service.calculate_fee(
            reservation['facility_id'],
            (reservation['end_time'] - reservation['start_time']).seconds,
            reservation['vehicle_type']
        )
        no_show_fee = estimated_cost * 0.2

        # Charge user
        await self.payment_service.charge_user(
            reservation['user_id'],
            no_show_fee,
            description=f"No-show fee for reservation {reservation_id}"
        )

        # Mark reservation as no-show
        await self.postgres.update(
            'reservations',
            {'status': 'NO_SHOW', 'fee_charged': no_show_fee},
            {'reservation_id': reservation_id}
        )

        # Free the spot
        await self.redis.srem(
            f"reserved_spots:{reservation['facility_id']}:{reservation['start_time'].date()}",
            reservation['spot_id']
        )

3.6 Multi-Level Parking Lot Representation

Data Model (PostgreSQL):

-- Facilities (parking lots)
CREATE TABLE facilities (
    facility_id UUID PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    address TEXT,
    latitude DECIMAL(10, 8),
    longitude DECIMAL(11, 8),
    total_floors INT,
    total_spots INT,
    operating_hours JSONB,  -- {"open": "00:00", "close": "24:00"}
    features JSONB,  -- ["ev_charging", "covered", "security"]
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_facilities_location ON facilities USING GIST (
    ll_to_earth(latitude, longitude)
);

-- Floors within facilities
CREATE TABLE floors (
    floor_id UUID PRIMARY KEY,
    facility_id UUID REFERENCES facilities(facility_id),
    floor_number INT,  -- 0 = ground, -1 = basement 1, etc.
    total_spots INT,
    layout_map JSONB,  -- SVG or coordinate-based layout
    entrance_coordinates JSONB,  -- {"x": 0, "y": 0}
    elevator_coordinates JSONB[],
    UNIQUE(facility_id, floor_number)
);

-- Parking spots
CREATE TABLE spots (
    spot_id UUID PRIMARY KEY,
    floor_id UUID REFERENCES floors(floor_id),
    facility_id UUID REFERENCES facilities(facility_id),
    spot_number VARCHAR(20),  -- "2A-015" (Floor 2, Zone A, Spot 15)
    spot_type VARCHAR(20),  -- compact, sedan, suv, motorcycle, ev, handicap
    zone VARCHAR(10),  -- A, B, C for easier navigation
    coordinates JSONB,  -- {"x": 150, "y": 200} for map display
    distance_from_entrance DECIMAL(6, 2),  -- meters
    features JSONB,  -- {"ev_charger": true, "covered": true}
    status VARCHAR(20) DEFAULT 'AVAILABLE',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_spots_facility ON spots(facility_id);
CREATE INDEX idx_spots_floor ON spots(floor_id);
CREATE INDEX idx_spots_type ON spots(spot_type);
CREATE INDEX idx_spots_status ON spots(status);

-- Parking sessions
CREATE TABLE parking_sessions (
    session_id UUID PRIMARY KEY,
    facility_id UUID REFERENCES facilities(facility_id),
    spot_id UUID REFERENCES spots(spot_id),
    plate_number VARCHAR(20),
    vehicle_type VARCHAR(20),
    entry_time TIMESTAMP,
    exit_time TIMESTAMP,
    entry_image_url TEXT,  -- S3 URL
    exit_image_url TEXT,
    status VARCHAR(20),  -- ACTIVE, COMPLETED, OVERSTAY
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_sessions_plate ON parking_sessions(plate_number);
CREATE INDEX idx_sessions_facility_time ON parking_sessions(facility_id, entry_time);

-- Reservations
CREATE TABLE reservations (
    reservation_id UUID PRIMARY KEY,
    user_id UUID,
    facility_id UUID REFERENCES facilities(facility_id),
    spot_id UUID REFERENCES spots(spot_id),
    plate_number VARCHAR(20),
    vehicle_type VARCHAR(20),
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    check_in_time TIMESTAMP,
    status VARCHAR(20),  -- CONFIRMED, ACTIVE, COMPLETED, CANCELLED, NO_SHOW
    preferences JSONB,  -- {"ev_charger": true}
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_reservations_user ON reservations(user_id);
CREATE INDEX idx_reservations_facility_time ON reservations(facility_id, start_time);
CREATE INDEX idx_reservations_status ON reservations(status);

-- Payments
CREATE TABLE payments (
    payment_id UUID PRIMARY KEY,
    session_id UUID REFERENCES parking_sessions(session_id),
    amount DECIMAL(10, 2),
    currency VARCHAR(3) DEFAULT 'USD',
    payment_method VARCHAR(50),
    payment_intent_id VARCHAR(255),  -- Stripe payment intent
    status VARCHAR(20),  -- PENDING, PAID, FAILED, REFUNDED
    error_message TEXT,
    created_at TIMESTAMP DEFAULT NOW(),
    paid_at TIMESTAMP
);

CREATE INDEX idx_payments_session ON payments(session_id);
CREATE INDEX idx_payments_status ON payments(status);

3.7 Mobile App Integration with Real-Time Updates

WebSocket Server for Real-Time Updates:

class WebSocketManager:
    def __init__(self):
        self.redis = RedisCluster()
        self.connections = {}  # {user_id: [websocket_connection]}

    async def handle_connection(self, websocket, user_id):
        """
        Handle WebSocket connection from mobile app
        """
        # Register connection
        if user_id not in self.connections:
            self.connections[user_id] = []
        self.connections[user_id].append(websocket)

        # Send initial state
        await self.send_initial_state(websocket, user_id)

        try:
            async for message in websocket:
                await self.handle_message(websocket, user_id, message)
        finally:
            # Cleanup on disconnect
            self.connections[user_id].remove(websocket)

    async def send_initial_state(self, websocket, user_id):
        """
        Send user's active session and reservation data
        """
        # Active parking session
        active_session = await self.redis.get(f"user_session:{user_id}")
        if active_session:
            await websocket.send(json.dumps({
                'type': 'active_session',
                'data': json.loads(active_session)
            }))

        # Upcoming reservations
        reservations = await self.db.query(
            """
            SELECT * FROM reservations
            WHERE user_id = $1
            AND status = 'CONFIRMED'
            AND start_time > NOW()
            ORDER BY start_time
            LIMIT 5
            """,
            user_id
        )

        await websocket.send(json.dumps({
            'type': 'reservations',
            'data': reservations
        }))

    async def broadcast(self, facility_id, message):
        """
        Broadcast availability updates to all users watching this facility
        """
        # Get users subscribed to this facility
        subscribers = await self.redis.smembers(f"subscribers:{facility_id}")

        for user_id in subscribers:
            if user_id in self.connections:
                for ws in self.connections[user_id]:
                    try:
                        await ws.send(json.dumps(message))
                    except Exception as e:
                        logger.error(f"Failed to send to {user_id}: {e}")

    async def handle_message(self, websocket, user_id, message):
        """
        Handle messages from mobile app
        """
        data = json.loads(message)

        if data['type'] == 'subscribe_facility':
            # User wants real-time updates for a facility
            facility_id = data['facility_id']
            await self.redis.sadd(f"subscribers:{facility_id}", user_id)

            # Send current availability
            availability = await self.get_facility_availability(facility_id)
            await websocket.send(json.dumps({
                'type': 'availability_update',
                'facility_id': facility_id,
                'data': availability
            }))

        elif data['type'] == 'find_parking':
            # User searching for parking nearby
            latitude = data['latitude']
            longitude = data['longitude']
            radius = data.get('radius', 5000)  # 5km default

            nearby = await self.find_nearby_facilities(latitude, longitude, radius)
            await websocket.send(json.dumps({
                'type': 'search_results',
                'data': nearby
            }))

        elif data['type'] == 'navigation_request':
            # User needs directions to their spot
            session_id = data['session_id']
            session = await self.get_session(session_id)

            navigation = await self.generate_navigation(
                session['facility_id'],
                session['spot_id']
            )

            await websocket.send(json.dumps({
                'type': 'navigation',
                'data': navigation
            }))

Step 4: Wrap-up

Key Architectural Decisions

1. Redis for Real-Time Availability

  • Sorted sets enable O(log N) spot queries by distance
  • Sub-second latency for availability updates
  • TTL-based session management prevents memory leaks
  • Pub/Sub for WebSocket broadcasting

2. PostgreSQL for Durability

  • ACID transactions for payments and reservations
  • Complex queries for analytics and reporting
  • PostGIS extension for geospatial queries
  • Partitioning by facility_id for scalability

3. Kafka for Event Streaming

  • Decouples services for independent scaling
  • Event sourcing for audit trails
  • Real-time analytics with Flink
  • Replay capability for debugging

4. Edge Computing for IoT

  • Process sensor data locally to reduce latency
  • MQTT for lightweight sensor communication
  • Batch updates to reduce network traffic
  • Fallback to cloud when edge is down

Scalability Considerations

Horizontal Scaling:

  • Stateless services behind load balancers
  • Redis Cluster with consistent hashing
  • PostgreSQL read replicas for queries
  • Kafka partitioning by facility_id

Database Sharding:

  • Shard by facility_id (co-locate related data)
  • Global tables: users, facilities
  • Cross-shard queries via aggregation service

Caching Strategy:

  • L1 (Application): In-memory LRU cache (spot metadata)
  • L2 (Redis): Shared cache for availability
  • L3 (CDN): Static assets (facility images, maps)
  • Write-through for consistency

Monitoring and Observability

Key Metrics:

  • Entry/exit latency (P50, P95, P99)
  • LPR accuracy rate and processing time
  • Payment success rate and retry count
  • Spot availability update latency
  • WebSocket connection count and message rate

Alerts:

  • Gate malfunction (no opens in 5 minutes)
  • LPR accuracy drops below 95%
  • Payment failure rate exceeds 2%
  • Sensor offline for > 1 minute
  • Capacity reaches 95% (notify nearby alternatives)

Distributed Tracing:

  • Trace entry-to-exit flow with OpenTelemetry
  • Correlate LPR, allocation, and payment spans
  • Identify bottlenecks in critical paths

Security and Compliance

Data Protection:

  • Encrypt license plate data at rest (AES-256)
  • PII (personally identifiable information) retention: 90 days
  • Anonymize analytics data
  • GDPR right-to-delete support

Access Control:

  • mTLS between services
  • API authentication with OAuth 2.0
  • Role-based access control (RBAC) for admin dashboard
  • Rate limiting to prevent abuse

PCI DSS Compliance:

  • Tokenize credit card data (Stripe handles storage)
  • No card data in logs
  • Quarterly security audits
  • Encrypted communication for payment flows

Future Enhancements

  1. Predictive Availability: ML models to forecast parking demand
  2. Autonomous Vehicle Integration: API for self-parking cars
  3. Carbon Footprint Tracking: Encourage EV adoption with incentives
  4. Smart City Integration: Share data with traffic management systems
  5. Valet Mode: Automated valet service with robotic parking

This smart parking system design handles Meta-scale traffic with real-time IoT integration, intelligent allocation algorithms, dynamic pricing, and seamless mobile experiences. The architecture prioritizes low latency for critical operations (entry/exit), high availability for payments, and eventual consistency for availability updates—balancing performance, reliability, and cost efficiency.