+ const listener = message => {
+ const { event, payload, queued_at } = message;
- log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
- output(event, encodedPayload);
- };
+ const transmit = () => {
+ const now = new Date().getTime();
+ const delta = now - queued_at;
+ const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
- // The listener used to process each message off the redis subscription,
- // message here is an object with an `event` and `payload` property. Some
- // events also include a queued_at value, but this is being removed shortly.
- /** @type {SubscriptionListener} */
- const listener = message => {
- const { event, payload } = message;
+ log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
+ output(event, encodedPayload);
+ };
// Only send local-only statuses to logged-in users
if (payload.local_only && !req.accountId) {
@@ -659,42 +651,29 @@ const startWorker = async (workerId) => {
return;
}
- // Streaming only needs to apply filtering to some channels and only to
- // some events. This is because majority of the filtering happens on the
- // Ruby on Rails side when producing the event for streaming.
- //
- // The only events that require filtering from the streaming server are
- // `update` and `status.update`, all other events are transmitted to the
- // client as soon as they're received (pass-through).
- //
- // The channels that need filtering are determined in the function
- // `channelNameToIds` defined below:
- if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
- transmit(event, payload);
+ // Only messages that may require filtering are statuses, since notifications
+ // are already personalized and deletes do not matter
+ if (!needsFiltering || event !== 'update') {
+ transmit();
return;
}
- // The rest of the logic from here on in this function is to handle
- // filtering of statuses:
+ const unpackedPayload = payload;
+ const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
+ const accountDomain = unpackedPayload.account.acct.split('@')[1];
- // Filter based on language:
- if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
- log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
+ if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
+ log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
return;
}
// When the account is not logged in, it is not necessary to confirm the block or mute
if (!req.accountId) {
- transmit(event, payload);
+ transmit();
return;
}
- // Filter based on domain blocks, blocks, mutes, or custom filters:
- const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
- const accountDomain = payload.account.acct.split('@')[1];
-
- // TODO: Move this logic out of the message handling loop
- pgPool.connect((err, client, releasePgConnection) => {
+ pgPool.connect((err, client, done) => {
if (err) {
log.error(err);
return;
@@ -709,57 +688,40 @@ const startWorker = async (workerId) => {
SELECT 1
FROM mutes
WHERE account_id = $1
- AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
+ AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
];
if (accountDomain) {
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
}
- if (!payload.filtered && !req.cachedFilters) {
+ if (!unpackedPayload.filtered && !req.cachedFilters) {
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
}
Promise.all(queries).then(values => {
- releasePgConnection();
+ done();
- // Handling blocks & mutes and domain blocks: If one of those applies,
- // then we don't transmit the payload of the event to the client
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
return;
}
- // If the payload already contains the `filtered` property, it means
- // that filtering has been applied on the ruby on rails side, as
- // such, we don't need to construct or apply the filters in streaming:
- if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
- transmit(event, payload);
- return;
- }
-
- // Handling for constructing the custom filters and caching them on the request
- // TODO: Move this logic out of the message handling lifecycle
- if (!req.cachedFilters) {
+ if (!unpackedPayload.filtered && !req.cachedFilters) {
const filterRows = values[accountDomain ? 2 : 1].rows;
- req.cachedFilters = filterRows.reduce((cache, filter) => {
- if (cache[filter.id]) {
- cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
+ req.cachedFilters = filterRows.reduce((cache, row) => {
+ if (cache[row.id]) {
+ cache[row.id].keywords.push([row.keyword, row.whole_word]);
} else {
- cache[filter.id] = {
- keywords: [[filter.keyword, filter.whole_word]],
- expires_at: filter.expires_at,
- filter: {
- id: filter.id,
- title: filter.title,
- context: filter.context,
- expires_at: filter.expires_at,
- // filter.filter_action is the value from the
- // custom_filters.action database column, it is an integer
- // representing a value in an enum defined by Ruby on Rails:
- //
- // enum { warn: 0, hide: 1 }
- filter_action: ['warn', 'hide'][filter.filter_action],
+ cache[row.id] = {
+ keywords: [[row.keyword, row.whole_word]],
+ expires_at: row.expires_at,
+ repr: {
+ id: row.id,
+ title: row.title,
+ context: row.context,
+ expires_at: row.expires_at,
+ filter_action: ['warn', 'hide'][row.filter_action],
},
};
}
@@ -767,10 +729,6 @@ const startWorker = async (workerId) => {
return cache;
}, {});
- // Construct the regular expressions for the custom filters: This
- // needs to be done in a separate loop as the database returns one
- // filterRow per keyword, so we need all the keywords before
- // constructing the regular expression
Object.keys(req.cachedFilters).forEach((key) => {
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
@@ -790,58 +748,31 @@ const startWorker = async (workerId) => {
});
}
- // Apply cachedFilters against the payload, constructing a
- // `filter_results` array of FilterResult entities
- if (req.cachedFilters) {
- const status = payload;
- // TODO: Calculate searchableContent in Ruby on Rails:
- const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/
/g, '\n').replace(/<\/p>/g, '\n\n');
- const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
+ // Check filters
+ if (req.cachedFilters && !unpackedPayload.filtered) {
+ const status = unpackedPayload;
+ const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/
/g, '\n').replace(/<\/p>
/g, '\n\n');
+ const searchIndex = JSDOM.fragment(searchContent).textContent;
const now = new Date();
- const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
- // Check the filter hasn't expired before applying:
- if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
- return results;
- }
-
- // Just in-case JSDOM fails to find textContent in searchableContent
- if (!searchableTextContent) {
- return results;
- }
-
- const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
- if (keyword_matches) {
- // results is an Array of FilterResult; status_matches is always
- // null as we only are only applying the keyword-based custom
- // filters, not the status-based custom filters.
- // https://docs.joinmastodon.org/entities/FilterResult/
- results.push({
- filter: cachedFilter.filter,
- keyword_matches,
- status_matches: null
- });
+ payload.filtered = [];
+ Object.values(req.cachedFilters).forEach((cachedFilter) => {
+ if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
+ const keyword_matches = searchIndex.match(cachedFilter.regexp);
+ if (keyword_matches) {
+ payload.filtered.push({
+ filter: cachedFilter.repr,
+ keyword_matches,
+ });
+ }
}
-
- return results;
- }, []);
-
- // Send the payload + the FilterResults as the `filtered` property
- // to the streaming connection. To reach this code, the `event` must
- // have been either `update` or `status.update`, meaning the
- // `payload` is a Status entity, which has a `filtered` property:
- //
- // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
- transmit(event, {
- ...payload,
- filtered: filter_results
});
- } else {
- transmit(event, payload);
}
+
+ transmit();
}).catch(err => {
- releasePgConnection();
log.error(err);
+ done();
});
});
};
@@ -850,7 +781,7 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${id}`, listener);
});
- if (typeof attachCloseHandler === 'function') {
+ if (attachCloseHandler) {
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
}
@@ -887,13 +818,12 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {function(): void} [closeHandler]
- * @returns {function(string[], SubscriptionListener): void}
+ * @return {function(string[]): void}
*/
-
- const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
+ const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
req.on('close', () => {
ids.forEach(id => {
- unsubscribe(id, listener);
+ unsubscribe(id);
});
if (closeHandler) {
@@ -1153,7 +1083,7 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
- * @property {Object.} subscriptions
+ * @property {Object.} subscriptions
*/
/**