@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
callbacks . forEach ( callback => callback ( json ) ) ;
} ;
/ * *
* @ callback SubscriptionListener
* @ param { ReturnType < parseJSON > } json of the message
* @ returns void
* /
/ * *
* @ param { string } channel
* @ param { function ( string ) : void } callback
* @ param { SubscriptionListener } callback
* /
const subscribe = ( channel , callback ) => {
log . silly ( ` Adding listener for ${ channel } ` ) ;
@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
/ * *
* @ param { string } channel
* @ param { function ( Object < string , any > ) : void } callback
* @ param { SubscriptionListener } callback
* /
const unsubscribe = ( channel , callback ) => {
log . silly ( ` Removing listener for ${ channel } ` ) ;
@ -623,27 +629,29 @@ const startWorker = async (workerId) => {
* @ param { string [ ] } ids
* @ param { any } req
* @ param { function ( string , string ) : void } output
* @ param { function ( string [ ] , function ( string ) : void ) : void } attachCloseHandler
* @ param { undefined | function ( string [ ] , SubscriptionListener ) : void } attachCloseHandler
* @ param { boolean = } needsFiltering
* @ returns { function ( object ) : void }
* @ returns { SubscriptionListener }
* /
const streamFrom = ( ids , req , output , attachCloseHandler , needsFiltering = false ) => {
const accountId = req . accountId || req . remoteAddress ;
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ` ) ;
// Currently message is of type string, soon it'll be Record<string, any>
const listener = message => {
const { event , payload , queued _at } = message ;
const transmit = ( event , payload ) => {
// TODO: Replace "string"-based delete payloads with object payloads:
const encodedPayload = typeof payload === 'object' ? JSON . stringify ( payload ) : payload ;
const transmit = ( ) => {
const now = new Date ( ) . getTime ( ) ;
const delta = now - queued _at ;
const encodedPayload = typeof payload === 'object' ? JSON . stringify ( payload ) : payload ;
log . silly ( req . requestId , ` Transmitting for ${ accountId } : ${ event } ${ encodedPayload } ` ) ;
output ( event , encodedPayload ) ;
} ;
log . silly ( req . requestId , ` Transmitting for ${ accountId } : ${ event } ${ encodedPayload } Delay: ${ delta } ms ` ) ;
output ( event , encodedPayload ) ;
} ;
// The listener used to process each message off the redis subscription,
// message here is an object with an `event` and `payload` property. Some
// events also include a queued_at value, but this is being removed shortly.
/** @type {SubscriptionListener} */
const listener = message => {
const { event , payload } = message ;
// Only send local-only statuses to logged-in users
if ( payload . local _only && ! req . accountId ) {
@ -651,29 +659,42 @@ const startWorker = async (workerId) => {
return ;
}
// Only messages that may require filtering are statuses, since notifications
// are already personalized and deletes do not matter
if ( ! needsFiltering || event !== 'update' ) {
transmit ( ) ;
// Streaming only needs to apply filtering to some channels and only to
// some events. This is because majority of the filtering happens on the
// Ruby on Rails side when producing the event for streaming.
//
// The only events that require filtering from the streaming server are
// `update` and `status.update`, all other events are transmitted to the
// client as soon as they're received (pass-through).
//
// The channels that need filtering are determined in the function
// `channelNameToIds` defined below:
if ( ! needsFiltering || ( event !== 'update' && event !== 'status.update' ) ) {
transmit ( event , payload ) ;
return ;
}
const unpackedPayload = payload ;
const targetAccountIds = [ unpackedPayload . account . id ] . concat ( unpackedPayload . mentions . map ( item => item . id ) ) ;
const accountDomain = unpackedPayload . account . acct . split ( '@' ) [ 1 ] ;
// The rest of the logic from here on in this function is to handle
// filtering of statuses:
if ( Array . isArray ( req . chosenLanguages ) && unpackedPayload . language !== null && req . chosenLanguages . indexOf ( unpackedPayload . language ) === - 1 ) {
log . silly ( req . requestId , ` Message ${ unpackedPayload . id } filtered by language ( ${ unpackedPayload . language } ) ` ) ;
// Filter based on language:
if ( Array . isArray ( req . chosenLanguages ) && payload . language !== null && req . chosenLanguages . indexOf ( payload . language ) === - 1 ) {
log . silly ( req . requestId , ` Message ${ payload . id } filtered by language ( ${ payload . language } ) ` ) ;
return ;
}
// When the account is not logged in, it is not necessary to confirm the block or mute
if ( ! req . accountId ) {
transmit ( ) ;
transmit ( event , payload ) ;
return ;
}
pgPool . connect ( ( err , client , done ) => {
// Filter based on domain blocks, blocks, mutes, or custom filters:
const targetAccountIds = [ payload . account . id ] . concat ( payload . mentions . map ( item => item . id ) ) ;
const accountDomain = payload . account . acct . split ( '@' ) [ 1 ] ;
// TODO: Move this logic out of the message handling loop
pgPool . connect ( ( err , client , releasePgConnection ) => {
if ( err ) {
log . error ( err ) ;
return ;
@ -688,40 +709,57 @@ const startWorker = async (workerId) => {
SELECT 1
FROM mutes
WHERE account _id = $1
AND target _account _id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) ` , [req.accountId, un packedP ayload.account.id].concat(targetAccountIds)),
AND target _account _id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) ` , [req.accountId, payload.account.id].concat(targetAccountIds)),
] ;
if ( accountDomain ) {
queries . push ( client . query ( 'SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2' , [ req . accountId , accountDomain ] ) ) ;
}
if ( ! un packedP ayload. filtered && ! req . cachedFilters ) {
if ( ! payload . filtered && ! req . cachedFilters ) {
queries . push ( client . query ( 'SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())' , [ req . accountId ] ) ) ;
}
Promise . all ( queries ) . then ( values => {
done ( ) ;
releasePgConnection ( ) ;
// Handling blocks & mutes and domain blocks: If one of those applies,
// then we don't transmit the payload of the event to the client
if ( values [ 0 ] . rows . length > 0 || ( accountDomain && values [ 1 ] . rows . length > 0 ) ) {
return ;
}
if ( ! unpackedPayload . filtered && ! req . cachedFilters ) {
// If the payload already contains the `filtered` property, it means
// that filtering has been applied on the ruby on rails side, as
// such, we don't need to construct or apply the filters in streaming:
if ( Object . prototype . hasOwnProperty . call ( payload , "filtered" ) ) {
transmit ( event , payload ) ;
return ;
}
// Handling for constructing the custom filters and caching them on the request
// TODO: Move this logic out of the message handling lifecycle
if ( ! req . cachedFilters ) {
const filterRows = values [ accountDomain ? 2 : 1 ] . rows ;
req . cachedFilters = filterRows . reduce ( ( cache , row ) => {
if ( cache [ row . id ] ) {
cache [ row . id ] . keywords . push ( [ row . keyword , row . whole _word ] ) ;
req . cachedFilters = filterRows . reduce ( ( cache , filte r) => {
if ( cache [ filte r. id ] ) {
cache [ filte r. id ] . keywords . push ( [ filte r. keyword , filte r. whole _word ] ) ;
} else {
cache [ row . id ] = {
keywords : [ [ row . keyword , row . whole _word ] ] ,
expires _at : row . expires _at ,
repr : {
id : row . id ,
title : row . title ,
context : row . context ,
expires _at : row . expires _at ,
filter _action : [ 'warn' , 'hide' ] [ row . filter _action ] ,
cache [ filter . id ] = {
keywords : [ [ filter . keyword , filter . whole _word ] ] ,
expires _at : filter . expires _at ,
filter : {
id : filter . id ,
title : filter . title ,
context : filter . context ,
expires _at : filter . expires _at ,
// filter.filter_action is the value from the
// custom_filters.action database column, it is an integer
// representing a value in an enum defined by Ruby on Rails:
//
// enum { warn: 0, hide: 1 }
filter _action : [ 'warn' , 'hide' ] [ filter . filter _action ] ,
} ,
} ;
}
@ -729,6 +767,10 @@ const startWorker = async (workerId) => {
return cache ;
} , { } ) ;
// Construct the regular expressions for the custom filters: This
// needs to be done in a separate loop as the database returns one
// filterRow per keyword, so we need all the keywords before
// constructing the regular expression
Object . keys ( req . cachedFilters ) . forEach ( ( key ) => {
req . cachedFilters [ key ] . regexp = new RegExp ( req . cachedFilters [ key ] . keywords . map ( ( [ keyword , whole _word ] ) => {
let expr = keyword . replace ( /[.*+?^${}()|[\]\\]/g , '\\$&' ) ;
@ -748,31 +790,58 @@ const startWorker = async (workerId) => {
} ) ;
}
// Check filters
if ( req . cachedFilters && ! unpackedPayload . filtered ) {
const status = unpackedPayload ;
const searchContent = ( [ status . spoiler _text || '' , status . content ] . concat ( ( status . poll && status . poll . options ) ? status . poll . options . map ( option => option . title ) : [ ] ) ) . concat ( status . media _attachments . map ( att => att . description ) ) . join ( '\n\n' ) . replace ( /<br\s*\/?>/g , '\n' ) . replace ( /<\/p><p>/g , '\n\n' ) ;
const searchIndex = JSDOM . fragment ( searchContent ) . textContent ;
// Apply cachedFilters against the payload, constructing a
// `filter_results` array of FilterResult entities
if ( req . cachedFilters ) {
const status = payload ;
// TODO: Calculate searchableContent in Ruby on Rails:
const searchableContent = ( [ status . spoiler _text || '' , status . content ] . concat ( ( status . poll && status . poll . options ) ? status . poll . options . map ( option => option . title ) : [ ] ) ) . concat ( status . media _attachments . map ( att => att . description ) ) . join ( '\n\n' ) . replace ( /<br\s*\/?>/g , '\n' ) . replace ( /<\/p><p>/g , '\n\n' ) ;
const searchableTextContent = JSDOM . fragment ( searchableContent ) . textContent ;
const now = new Date ( ) ;
payload . filtered = [ ] ;
Object . values ( req . cachedFilters ) . forEach ( ( cachedFilter ) => {
if ( ( cachedFilter . expires _at === null || cachedFilter . expires _at > now ) ) {
const keyword _matches = searchIndex . match ( cachedFilter . regexp ) ;
if ( keyword _matches ) {
payload . filtered . push ( {
filter : cachedFilter . repr ,
keyword _matches ,
} ) ;
}
const filter _results = Object . values ( req . cachedFilters ) . reduce ( ( results , cachedFilter ) => {
// Check the filter hasn't expired before applying:
if ( cachedFilter . expires _at !== null && cachedFilter . expires _at < now ) {
return results ;
}
// Just in-case JSDOM fails to find textContent in searchableContent
if ( ! searchableTextContent ) {
return results ;
}
const keyword _matches = searchableTextContent . match ( cachedFilter . regexp ) ;
if ( keyword _matches ) {
// results is an Array of FilterResult; status_matches is always
// null as we only are only applying the keyword-based custom
// filters, not the status-based custom filters.
// https://docs.joinmastodon.org/entities/FilterResult/
results . push ( {
filter : cachedFilter . filter ,
keyword _matches ,
status _matches : null
} ) ;
}
return results ;
} , [ ] ) ;
// Send the payload + the FilterResults as the `filtered` property
// to the streaming connection. To reach this code, the `event` must
// have been either `update` or `status.update`, meaning the
// `payload` is a Status entity, which has a `filtered` property:
//
// filtered: https://docs.joinmastodon.org/entities/Status/#filtered
transmit ( event , {
... payload ,
filtered : filter _results
} ) ;
} else {
transmit ( event , payload ) ;
}
transmit ( ) ;
} ) . catch ( err => {
releasePgConnection ( ) ;
log . error ( err ) ;
done ( ) ;
} ) ;
} ) ;
} ;
@ -781,7 +850,7 @@ const startWorker = async (workerId) => {
subscribe ( ` ${ redisPrefix } ${ id } ` , listener ) ;
} ) ;
if ( attachCloseHandler ) {
if ( typeof attachCloseHandler === 'function' ) {
attachCloseHandler ( ids . map ( id => ` ${ redisPrefix } ${ id } ` ) , listener ) ;
}
@ -818,12 +887,13 @@ const startWorker = async (workerId) => {
/ * *
* @ param { any } req
* @ param { function ( ) : void } [ closeHandler ]
* @ return { function ( string [ ] ) : void }
* @ returns { function ( string [ ] , SubscriptionListener ) : void }
* /
const streamHttpEnd = ( req , closeHandler = undefined ) => ( ids ) => {
const streamHttpEnd = ( req , closeHandler = undefined ) => ( ids , listener ) => {
req . on ( 'close' , ( ) => {
ids . forEach ( id => {
unsubscribe ( id ) ;
unsubscribe ( id , listener ) ;
} ) ;
if ( closeHandler ) {
@ -1083,7 +1153,7 @@ const startWorker = async (workerId) => {
* @ typedef WebSocketSession
* @ property { any } socket
* @ property { any } request
* @ property { Object . < string , { listener : function ( string ) : void , stopHeartbeat : function ( ) : void } > } subscriptions
* @ property { Object . < string , { listener : SubscriptionListener , stopHeartbeat : function ( ) : void } > } subscriptions
* /
/ * *