Skip to content

Services

FenLiu's logic layer, separated from API endpoints.

Overview

Services handle core functionality: - Spam scoring - Fediverse API integration - Export eligibility checking - API key management - Blocklist caching - Background scheduling

Located in src/fenliu/services/

Spam Scoring Service

File: spam_scoring.py

Implements rule-based spam detection (0-100 scale).

Detection Rules

  1. Link Count: Posts with many links score higher
  2. 5+ links = high spam indicator
  3. Weight: 20 points

  4. Mention Abuse: Mass mentions of users

  5. 10+ mentions = abuse indicator
  6. Weight: 15 points

  7. Repetitive Content: Repeated words/phrases

  8. Same content repeated = spam pattern
  9. Weight: 25 points

  10. Hashtag Abuse: Excessive hashtags

  11. 20+ hashtags = abuse indicator
  12. Weight: 10 points

  13. All Caps: Text in all uppercase

  14. 50% caps = suspicious

  15. Weight: 10 points

  16. Suspicious Patterns: Known spam keywords

  17. Buy now, click here, etc.
  18. Weight: 15 points

  19. URL Patterns: Suspicious URL formats

  20. Bit.ly, shortened URLs, redirects
  21. Weight: 5 points

Usage

from fenliu.services.spam_scoring import score_post
from fenliu.database import get_db

with get_db() as db:
    post = db.get(Post, post_id)
    score = score_post(post, db)  # Returns 0-100

Score Interpretation

  • 0-25: Low spam confidence, likely legitimate
  • 25-50: Low-medium spam confidence
  • 50-75: Medium-high spam confidence
  • 75-100: High spam confidence, likely spam

Fediverse Client Service

File: fediverse.py

Integrates with Mastodon/ActivityPub instances.

Main Functions

fetch_posts(hashtag, instance, limit) - Fetches recent posts with a hashtag - Queries Mastodon API - Returns list of Post objects - Handles rate limiting

Usage

from fenliu.services.fediverse import fetch_posts

posts = await fetch_posts(
    hashtag="python",
    instance="mastodon.social",
    limit=20
)
# posts = [Post(...), Post(...), ...]

Error Handling

  • Network timeouts: Raises exception, retryable
  • Rate limits: Respects Mastodon headers
  • Invalid instance: Returns empty list
  • Malformed data: Skips problematic posts

Configuration

Via environment: - DEFAULT_INSTANCE: Default instance (mastodon.social) - API_TIMEOUT: Request timeout (30s) - RATE_LIMIT_DELAY: Delay between requests (1s) - MAX_POSTS_PER_FETCH: Max posts per request (20)

Export Eligibility Service

File: export_eligibility.py

Determines if posts are eligible for Curated Queue export.

Eligibility Checks

Posts must pass all active filters:

  1. Author Not Blocked: Check BlockedUser list
  2. No Blocked Hashtags: Check BlockedHashtag list
  3. Has Attachments (if enabled): Post contains media

Usage

from fenliu.services.export_eligibility import check_reblog_filters

result = check_reblog_filters(post, db)

if result.eligible:
    # Post can be exported
    pass
else:
    # Reason: result.reason (e.g., "blocked user")
    pass

Return Value

@dataclass
class EligibilityResult:
    eligible: bool
    reason: str | None  # Why not eligible (if applicable)

Auto-Reject

When enabled, automatically reject posts that fail eligibility:

from fenliu.services.export_eligibility import reject_blocked_posts

reject_blocked_posts(stream_id, db)
# Posts from blocked users/hashtags are auto-rejected

API Key Service

File: api_key.py

Manages API key authentication.

Functions

generate_key() - Creates new random API key - Stores hashed value in database - Returns unhashed key (save this!)

validate_key(key) - Checks if key is valid - Returns user/app associated with key - Used by API authentication middleware

Usage

from fenliu.services.api_key import validate_key

result = validate_key("your-api-key")
if result.valid:
    # API key is valid, proceed
    pass
else:
    # Invalid key, reject request
    pass

Service Patterns

Dependency Injection

Services receive dependencies:

def score_post(post: Post, db: Session) -> int:
    # db passed in, not imported
    # Makes testing easy (mock db)
    pass

Error Handling

Services raise specific exceptions:

from fenliu.services.fediverse import FediverseError

try:
    posts = await fetch_posts(...)
except FediverseError as e:
    logger.error(f"Fetch failed: {e}")

Async Support

I/O-bound services are async:

async def fetch_posts(...) -> List[Post]:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    return posts

Compute-bound services are sync:

def score_post(post: Post, db: Session) -> int:
    # Synchronous scoring
    return score

Testing Services

Mock external dependencies:

@patch('fenliu.services.fediverse.httpx.AsyncClient')
async def test_fetch_posts(mock_client):
    mock_client.return_value.__aenter__.return_value.get = AsyncMock(
        return_value=mock_response
    )
    # Test fetch logic

Future Extensions

Machine Learning Integration

ReviewFeedback data ready for ML model training:

# Future: Train model on review decisions
model = train_spam_model(reviews_from_db)
score = model.predict(post.content)

Custom Rules

Extensible rule system for users to define custom spam patterns:

# Future: User-defined rules
rules = load_custom_rules(user_id)
score = calculate_spam_with_rules(post, rules)

Performance

Services optimized for speed:

  • Spam Scoring: <1ms per post (regex-based)
  • Fediverse Fetch: 100-500ms (network-bound)
  • Export Eligibility: <1ms per post (database lookups)
  • API Key Validation: <1ms (cached results)

Blocklist Cache Service

File: blocklist_cache.py

In-memory cache of blocked users and blocked hashtags, loaded at startup to avoid repeated database queries on every queue request.

Functions

initialize_cache(db) - Loads all BlockedUser and BlockedHashtag rows into memory - Called during application lifespan startup - Also called after blocklist changes (e.g., from Settings page)

is_user_blocked(account_identifier) - Checks the in-memory cache using all pattern types (exact, suffix, prefix, contains) - Returns True if the account matches any blocked user entry

is_hashtag_blocked(hashtag) - Checks the in-memory cache for an exact hashtag match - Returns True if the hashtag is in the blocklist

Scheduler Service

File: scheduler.py

Background task scheduler powered by APScheduler.

Functions

start_scheduler() - Called at application startup - Loads all active streams with enable_scheduling=True - Schedules periodic fetches based on each stream's fetch_interval_minutes - Schedules daily cleanup of delivered posts (older than 7 days)

stop_scheduler() - Called at application shutdown for graceful termination

Scheduled Jobs

Job Trigger Description
Per-stream fetch interval (configurable) Fetch new posts for stream
Delivered posts cleanup interval (24h) Delete posts >7 days old, record stats

Adaptive Scheduling

After each fetch the scheduler adjusts the next interval:

  • >10 posts returned: interval reduced by 20% (min 15 min)
  • <2 posts returned: interval increased by 20% (max 240 min)
  • 2-10 posts: interval unchanged

Next Steps