Services¶
FenLiu's logic layer, separated from API endpoints.
Overview¶
Services handle core functionality: - Spam scoring - Fediverse API integration - Export eligibility checking - API key management - Blocklist caching - Background scheduling
Located in src/fenliu/services/
Spam Scoring Service¶
File: spam_scoring.py
Implements rule-based spam detection (0-100 scale).
Detection Rules¶
- Link Count: Posts with many links score higher
- 5+ links = high spam indicator
-
Weight: 20 points
-
Mention Abuse: Mass mentions of users
- 10+ mentions = abuse indicator
-
Weight: 15 points
-
Repetitive Content: Repeated words/phrases
- Same content repeated = spam pattern
-
Weight: 25 points
-
Hashtag Abuse: Excessive hashtags
- 20+ hashtags = abuse indicator
-
Weight: 10 points
-
All Caps: Text in all uppercase
-
50% caps = suspicious
-
Weight: 10 points
-
Suspicious Patterns: Known spam keywords
- Buy now, click here, etc.
-
Weight: 15 points
-
URL Patterns: Suspicious URL formats
- Bit.ly, shortened URLs, redirects
- Weight: 5 points
Usage¶
from fenliu.services.spam_scoring import score_post
from fenliu.database import get_db
with get_db() as db:
post = db.get(Post, post_id)
score = score_post(post, db) # Returns 0-100
Score Interpretation¶
- 0-25: Low spam confidence, likely legitimate
- 25-50: Low-medium spam confidence
- 50-75: Medium-high spam confidence
- 75-100: High spam confidence, likely spam
Fediverse Client Service¶
File: fediverse.py
Integrates with Mastodon/ActivityPub instances.
Main Functions¶
fetch_posts(hashtag, instance, limit) - Fetches recent posts with a hashtag - Queries Mastodon API - Returns list of Post objects - Handles rate limiting
Usage¶
from fenliu.services.fediverse import fetch_posts
posts = await fetch_posts(
hashtag="python",
instance="mastodon.social",
limit=20
)
# posts = [Post(...), Post(...), ...]
Error Handling¶
- Network timeouts: Raises exception, retryable
- Rate limits: Respects Mastodon headers
- Invalid instance: Returns empty list
- Malformed data: Skips problematic posts
Configuration¶
Via environment:
- DEFAULT_INSTANCE: Default instance (mastodon.social)
- API_TIMEOUT: Request timeout (30s)
- RATE_LIMIT_DELAY: Delay between requests (1s)
- MAX_POSTS_PER_FETCH: Max posts per request (20)
Export Eligibility Service¶
File: export_eligibility.py
Determines if posts are eligible for Curated Queue export.
Eligibility Checks¶
Posts must pass all active filters:
- Author Not Blocked: Check BlockedUser list
- No Blocked Hashtags: Check BlockedHashtag list
- Has Attachments (if enabled): Post contains media
Usage¶
from fenliu.services.export_eligibility import check_reblog_filters
result = check_reblog_filters(post, db)
if result.eligible:
# Post can be exported
pass
else:
# Reason: result.reason (e.g., "blocked user")
pass
Return Value¶
@dataclass
class EligibilityResult:
eligible: bool
reason: str | None # Why not eligible (if applicable)
Auto-Reject¶
When enabled, automatically reject posts that fail eligibility:
from fenliu.services.export_eligibility import reject_blocked_posts
reject_blocked_posts(stream_id, db)
# Posts from blocked users/hashtags are auto-rejected
API Key Service¶
File: api_key.py
Manages API key authentication.
Functions¶
generate_key() - Creates new random API key - Stores hashed value in database - Returns unhashed key (save this!)
validate_key(key) - Checks if key is valid - Returns user/app associated with key - Used by API authentication middleware
Usage¶
from fenliu.services.api_key import validate_key
result = validate_key("your-api-key")
if result.valid:
# API key is valid, proceed
pass
else:
# Invalid key, reject request
pass
Service Patterns¶
Dependency Injection¶
Services receive dependencies:
def score_post(post: Post, db: Session) -> int:
# db passed in, not imported
# Makes testing easy (mock db)
pass
Error Handling¶
Services raise specific exceptions:
from fenliu.services.fediverse import FediverseError
try:
posts = await fetch_posts(...)
except FediverseError as e:
logger.error(f"Fetch failed: {e}")
Async Support¶
I/O-bound services are async:
async def fetch_posts(...) -> List[Post]:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return posts
Compute-bound services are sync:
def score_post(post: Post, db: Session) -> int:
# Synchronous scoring
return score
Testing Services¶
Mock external dependencies:
@patch('fenliu.services.fediverse.httpx.AsyncClient')
async def test_fetch_posts(mock_client):
mock_client.return_value.__aenter__.return_value.get = AsyncMock(
return_value=mock_response
)
# Test fetch logic
Future Extensions¶
Machine Learning Integration
ReviewFeedback data ready for ML model training:
# Future: Train model on review decisions
model = train_spam_model(reviews_from_db)
score = model.predict(post.content)
Custom Rules
Extensible rule system for users to define custom spam patterns:
# Future: User-defined rules
rules = load_custom_rules(user_id)
score = calculate_spam_with_rules(post, rules)
Performance¶
Services optimized for speed:
- Spam Scoring: <1ms per post (regex-based)
- Fediverse Fetch: 100-500ms (network-bound)
- Export Eligibility: <1ms per post (database lookups)
- API Key Validation: <1ms (cached results)
Blocklist Cache Service¶
File: blocklist_cache.py
In-memory cache of blocked users and blocked hashtags, loaded at startup to avoid repeated database queries on every queue request.
Functions¶
initialize_cache(db) - Loads all BlockedUser and BlockedHashtag rows into memory - Called during application lifespan startup - Also called after blocklist changes (e.g., from Settings page)
is_user_blocked(account_identifier)
- Checks the in-memory cache using all pattern types (exact, suffix, prefix, contains)
- Returns True if the account matches any blocked user entry
is_hashtag_blocked(hashtag)
- Checks the in-memory cache for an exact hashtag match
- Returns True if the hashtag is in the blocklist
Scheduler Service¶
File: scheduler.py
Background task scheduler powered by APScheduler.
Functions¶
start_scheduler()
- Called at application startup
- Loads all active streams with enable_scheduling=True
- Schedules periodic fetches based on each stream's fetch_interval_minutes
- Schedules daily cleanup of delivered posts (older than 7 days)
stop_scheduler() - Called at application shutdown for graceful termination
Scheduled Jobs¶
| Job | Trigger | Description |
|---|---|---|
| Per-stream fetch | interval (configurable) | Fetch new posts for stream |
| Delivered posts cleanup | interval (24h) | Delete posts >7 days old, record stats |
Adaptive Scheduling¶
After each fetch the scheduler adjusts the next interval:
- >10 posts returned: interval reduced by 20% (min 15 min)
- <2 posts returned: interval increased by 20% (max 240 min)
- 2-10 posts: interval unchanged
Next Steps¶
- System Design - Architecture overview
- Database Models - Data structures
- Testing Guide - Testing patterns