Spaces:
Sleeping
Sleeping
| import { Hono } from 'hono'; | |
| import { stream } from 'hono/streaming'; | |
| import type { StatusCode } from 'hono/utils/http-status'; | |
| import { existsSync, mkdirSync, writeFileSync, readFileSync, unlinkSync } from 'fs'; | |
| import { join, basename, resolve } from 'path'; | |
| import { uploadFiles, checkRepoAccess, createRepo,whoAmI } from '@huggingface/hub'; | |
| import type { RepoDesignation } from '@huggingface/hub'; | |
| const app = new Hono(); | |
| const TARGET_BASE_URL = process.env.TARGET_BASE_URL || "https://router.huggingface.co"; | |
| const PORT = parseInt(process.env.PORT || '4040', 10); | |
| const LOGS_DIR = process.env.LOGS_DIR || './logs'; | |
| const HF_ACCESS_TOKEN = process.env.HF_API_KEY || ''; | |
| const DATASET_PRIVATE = (process.env.DATASET_PRIVATE || 'false').toLowerCase() === 'true'; | |
| /* | |
| USER_NAME - the name of the user to use for the dataset | |
| This will be used to invalidate requests that are not from the user | |
| */ | |
| const USER_NAME = process.env.USER_NAME || 'cfahlgren1'; | |
| if (!HF_ACCESS_TOKEN) { | |
| console.error('Please set HF_API_KEY in environment variable'); | |
| process.exit(1); | |
| } | |
| if (!USER_NAME) { | |
| console.error('Please set USER_NAME in environment variable'); | |
| process.exit(1); | |
| } | |
| /* | |
| BATCH_SIZE_LIMIT - the maximum batch size before pushing to dataset | |
| BATCH_TIME_LIMIT - the amount of time before pushing to dataset | |
| We will push to dataset for whatever is reached first. | |
| */ | |
| const BATCH_SIZE_LIMIT = parseInt(process.env.BATCH_SIZE_LIMIT || '100', 10); | |
| const BATCH_TIME_LIMIT = parseInt(process.env.BATCH_TIME_LIMIT || '1', 10); // 1 minute default | |
| if (!existsSync(LOGS_DIR)) { | |
| mkdirSync(LOGS_DIR, { recursive: true }); | |
| } | |
| async function checkUserAccess(username: string): Promise<boolean> { | |
| const response = await whoAmI({ accessToken: HF_ACCESS_TOKEN }); | |
| return response.name === username; | |
| } | |
| const requestTraces: { | |
| model?: string; | |
| timestamp_start: string; | |
| timestamp_end?: string; | |
| messages?: any[]; | |
| prompt_tokens?: number; | |
| completion_tokens?: number; | |
| response?: string; | |
| arguments?: any; | |
| provider?: string; | |
| duration_ms?: number; | |
| }[] = []; | |
| let lastTraceBatchTime = Date.now(); | |
| async function checkDatasetExists(datasetName: string): Promise<boolean> { | |
| try { | |
| if (!HF_ACCESS_TOKEN) { | |
| console.warn('HF_ACCESS_TOKEN not set, skipping dataset check'); | |
| return false; | |
| } | |
| const repo: RepoDesignation = { type: 'dataset', name: datasetName }; | |
| await checkRepoAccess({ repo, accessToken: HF_ACCESS_TOKEN }); | |
| return true; | |
| } catch (error) { | |
| return false; | |
| } | |
| } | |
| async function createDataset(datasetName: string): Promise<boolean> { | |
| try { | |
| if (!HF_ACCESS_TOKEN) { | |
| console.warn('HF_ACCESS_TOKEN not set, skipping dataset creation'); | |
| return false; | |
| } | |
| const repo: RepoDesignation = { type: 'dataset', name: datasetName }; | |
| await createRepo({ | |
| repo, | |
| accessToken: HF_ACCESS_TOKEN, | |
| private: DATASET_PRIVATE, | |
| files: [ | |
| { | |
| path: 'README.md', | |
| content: new Blob([`--- | |
| tags: | |
| - inference-proxy | |
| ---`]), | |
| } | |
| ] | |
| }); | |
| return true; | |
| } catch (error) { | |
| console.error('Error creating dataset:', error); | |
| return false; | |
| } | |
| } | |
| function writeTraceToFile(trace: typeof requestTraces[0]): string { | |
| try { | |
| const timestamp = new Date(trace.timestamp_start).getTime(); | |
| const model = trace.model || 'unknown'; | |
| const filename = `${timestamp}_${model.replace(/\//g, '_')}.json`; | |
| const filePath = join(LOGS_DIR, filename); | |
| writeFileSync(filePath, JSON.stringify(trace, null, 2)); | |
| return filePath; | |
| } catch (error) { | |
| console.error('Error writing trace to file:', error); | |
| return ''; | |
| } | |
| } | |
| async function uploadTraceFile(filePath: string, datasetName: string): Promise<boolean> { | |
| try { | |
| if (!HF_ACCESS_TOKEN) { | |
| console.warn('HF_ACCESS_TOKEN not set, skipping file upload'); | |
| return false; | |
| } | |
| const repo: RepoDesignation = { type: 'dataset', name: datasetName }; | |
| const fileName = basename(filePath); | |
| const uploadPath = `traces/${fileName}`; | |
| await uploadFiles({ | |
| repo, | |
| accessToken: HF_ACCESS_TOKEN, | |
| files: [ | |
| { | |
| path: uploadPath, | |
| content: new Blob([readFileSync(filePath)]), | |
| }, | |
| ], | |
| }); | |
| return true; | |
| } catch (error) { | |
| console.error('Error uploading trace file:', error); | |
| return false; | |
| } | |
| } | |
| async function writeBatchedTraces() { | |
| if (requestTraces.length === 0) { | |
| return; | |
| } | |
| const tracesToWrite = [...requestTraces]; | |
| const batchSize = tracesToWrite.length; | |
| requestTraces.length = 0; | |
| lastTraceBatchTime = Date.now(); | |
| console.log(`Processing batch of ${batchSize} traces...`); | |
| // write traces to local files first | |
| const filePaths: string[] = []; | |
| for (const trace of tracesToWrite) { | |
| const filePath = writeTraceToFile(trace); | |
| if (filePath) { | |
| filePaths.push(filePath); | |
| } | |
| } | |
| if (HF_ACCESS_TOKEN) { | |
| const response = await whoAmI({ accessToken: HF_ACCESS_TOKEN }); | |
| const datasetName = `${response.name}/traces`; | |
| // Check if dataset exists, create if not | |
| const exists = await checkDatasetExists(datasetName); | |
| if (!exists) { | |
| console.log(`Dataset ${datasetName} does not exist, creating...`); | |
| const created = await createDataset(datasetName); | |
| if (!created) { | |
| console.error(`Failed to create dataset ${datasetName}`); | |
| } else { | |
| console.log(`Successfully created dataset ${datasetName}`); | |
| } | |
| } | |
| // Upload files to dataset | |
| for (const filePath of filePaths) { | |
| const uploaded = await uploadTraceFile(filePath, datasetName); | |
| // Clean up local file if uploaded successfully | |
| if (uploaded && existsSync(filePath)) { | |
| unlinkSync(filePath); | |
| console.log(`Deleted local file ${filePath} after successful upload`); | |
| } | |
| } | |
| } else { | |
| console.log(`HF_ACCESS_TOKEN or HF_DATASET_OWNER not set, keeping ${filePaths.length} local files`); | |
| } | |
| console.log(`Successfully processed ${batchSize} traces.`); | |
| } | |
| setInterval(() => { | |
| const timeSinceLastBatch = Date.now() - lastTraceBatchTime; | |
| if (timeSinceLastBatch >= BATCH_TIME_LIMIT * 60 * 1000 && requestTraces.length > 0) { | |
| console.log(`Time limit reached (${BATCH_TIME_LIMIT} minutes). Flushing ${requestTraces.length} traces.`); | |
| writeBatchedTraces().catch(err => { | |
| console.error('Error flushing traces:', err); | |
| }); | |
| } | |
| }, Math.min(BATCH_TIME_LIMIT * 60 * 1000, 10000)); // Check at least every 10 seconds | |
| function checkAndFlushTraces() { | |
| if (requestTraces.length >= BATCH_SIZE_LIMIT) { | |
| console.log(`Batch size limit reached (${BATCH_SIZE_LIMIT}). Flushing traces.`); | |
| writeBatchedTraces().catch(err => { | |
| console.error('Error flushing traces:', err); | |
| }); | |
| return true; | |
| } | |
| return false; | |
| } | |
| app.get('/', (c) => { | |
| const hostUrl = new URL(c.req.url).origin; | |
| try { | |
| let html = readFileSync('index.html', 'utf8'); | |
| // Replace template variables | |
| html = html.replace(/{{TARGET_BASE_URL}}/g, TARGET_BASE_URL) | |
| .replace(/{{HOST_URL}}/g, hostUrl); | |
| return c.html(html); | |
| } catch (error) { | |
| console.error('Error reading index.html:', error); | |
| return c.text('Hono forwarding proxy running!', 500); | |
| } | |
| }); | |
| async function storeStreamedResponse(streamToLog: ReadableStream<Uint8Array>, contentType: string | null, targetUrl: string, traceIndex: number) { | |
| const reader = streamToLog.getReader(); | |
| const chunks: Uint8Array[] = []; | |
| try { | |
| while (true) { | |
| const { done, value } = await reader.read(); | |
| if (done) break; | |
| if (value) chunks.push(value); | |
| } | |
| const blob = new Blob(chunks); | |
| const bodyText = await blob.text(); | |
| contentType = contentType?.toLowerCase() || ''; | |
| // Handle event streams (streaming responses) | |
| if (contentType.includes('text/event-stream')) { | |
| const lines = bodyText.split('\n'); | |
| let accumulatedContent = ''; | |
| for (const line of lines) { | |
| if (line.startsWith('data: ')) { | |
| const jsonData = line.substring(5).trim(); | |
| if (jsonData && jsonData !== '[DONE]') { | |
| try { | |
| const parsed = JSON.parse(jsonData); | |
| if (parsed.choices && parsed.choices[0]?.delta?.content) { | |
| accumulatedContent += parsed.choices[0].delta.content; | |
| } | |
| } catch (parseError) { | |
| // Continue processing other lines | |
| } | |
| } | |
| } | |
| } | |
| if (accumulatedContent) { | |
| requestTraces[traceIndex].response = accumulatedContent; | |
| requestTraces[traceIndex].completion_tokens = accumulatedContent.length; | |
| } | |
| } | |
| else { | |
| try { | |
| const jsonResponse = JSON.parse(bodyText); | |
| // Get response content from standard LLM response formats | |
| requestTraces[traceIndex].response = jsonResponse.choices?.[0]?.message?.content || | |
| jsonResponse.generated_text || | |
| bodyText; | |
| // Get token counts if available | |
| if (jsonResponse.usage) { | |
| if (jsonResponse.usage.completion_tokens !== undefined) { | |
| requestTraces[traceIndex].completion_tokens = jsonResponse.usage.completion_tokens; | |
| } | |
| if (jsonResponse.usage.prompt_tokens !== undefined) { | |
| requestTraces[traceIndex].prompt_tokens = jsonResponse.usage.prompt_tokens; | |
| } | |
| } | |
| } catch (e) { | |
| // If not JSON, use bodyText as is | |
| requestTraces[traceIndex].response = bodyText; | |
| requestTraces[traceIndex].completion_tokens = bodyText.length; | |
| } | |
| } | |
| // Set the end timestamp after processing | |
| requestTraces[traceIndex].timestamp_end = new Date().toISOString(); | |
| // Calculate duration if we have both timestamps | |
| if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { | |
| const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); | |
| const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); | |
| requestTraces[traceIndex].duration_ms = endTime - startTime; | |
| } | |
| checkAndFlushTraces(); | |
| } catch (error) { | |
| requestTraces[traceIndex].timestamp_end = new Date().toISOString(); | |
| // Calculate duration if we have both timestamps | |
| if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { | |
| const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); | |
| const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); | |
| requestTraces[traceIndex].duration_ms = endTime - startTime; | |
| } | |
| checkAndFlushTraces(); | |
| } finally { | |
| try { | |
| reader.releaseLock(); | |
| } catch { | |
| // Ignore release errors | |
| } | |
| } | |
| } | |
| app.all('*', async (c) => { | |
| try { | |
| // check if the user is authorized to access the dataset | |
| if (USER_NAME && !await checkUserAccess(USER_NAME)) { | |
| return c.text('Unauthorized', 401); | |
| } | |
| const url = new URL(c.req.url); | |
| const targetPath = url.pathname; | |
| const targetUrl = `${TARGET_BASE_URL}${targetPath}${url.search}`; | |
| // Skip trace creation for favicon and other common browser requests | |
| const skipTracePatterns = ['/favicon.ico', '/robots.txt']; | |
| const shouldSkipTrace = skipTracePatterns.some(pattern => targetPath.includes(pattern)); | |
| // Extract provider from the URL path | |
| const pathParts = targetPath.split('/'); | |
| const provider = pathParts.length > 1 ? pathParts[1] : 'unknown'; | |
| // Only log if we're not skipping the trace | |
| if (!shouldSkipTrace) { | |
| console.log(`Forwarding request for ${url.pathname} to ${targetUrl}`); | |
| } | |
| const headers = new Headers(c.req.header()); | |
| headers.delete('host'); | |
| headers.set('host', new URL(TARGET_BASE_URL).host); | |
| headers.delete('content-length'); | |
| headers.delete('transfer-encoding'); | |
| let requestBody: BodyInit | null = null; | |
| let parsedRequestBody: any = null; | |
| const incomingContentType = c.req.header('content-type') || ''; | |
| const methodNeedsBody = !['GET', 'HEAD'].includes(c.req.method); | |
| if (methodNeedsBody && c.req.raw.body) { | |
| if (incomingContentType.includes('application/json')) { | |
| try { | |
| const rawBodyText = await c.req.text(); | |
| parsedRequestBody = JSON.parse(rawBodyText); | |
| requestBody = rawBodyText; | |
| } catch (e) { | |
| console.warn("Failed to parse incoming JSON body, forwarding raw body:", e); | |
| try { | |
| requestBody = await c.req.blob(); | |
| } catch (blobError) { | |
| console.error("Could not retrieve request body after JSON parse failure:", blobError); | |
| requestBody = null; | |
| } | |
| } | |
| } else { | |
| requestBody = c.req.raw.body; | |
| } | |
| } | |
| let shouldCreateTrace = !shouldSkipTrace; | |
| let traceEntry: typeof requestTraces[0] = { | |
| timestamp_start: new Date().toISOString(), | |
| provider | |
| }; | |
| if (parsedRequestBody) { | |
| if (parsedRequestBody.model) { | |
| traceEntry.model = parsedRequestBody.model; | |
| } else if (targetPath.includes('/models/') || targetPath.includes('/model/')) { | |
| const pathParts = targetPath.split('/'); | |
| const modelIndex = pathParts.findIndex(part => part === 'models' || part === 'model'); | |
| if (modelIndex >= 0 && pathParts.length > modelIndex + 1) { | |
| traceEntry.model = pathParts[modelIndex + 1]; | |
| } | |
| } | |
| // Skip traces without a valid model | |
| if (!traceEntry.model || traceEntry.model === 'unknown') { | |
| shouldCreateTrace = false; | |
| } | |
| if (parsedRequestBody.messages) { | |
| traceEntry.messages = parsedRequestBody.messages; | |
| let promptText = ''; | |
| for (const message of parsedRequestBody.messages) { | |
| if (message.content) { | |
| promptText += message.content; | |
| } | |
| } | |
| traceEntry.prompt_tokens = promptText.length; | |
| } | |
| if (parsedRequestBody.arguments) { | |
| traceEntry.arguments = parsedRequestBody.arguments; | |
| } else if (parsedRequestBody.parameters) { | |
| traceEntry.arguments = parsedRequestBody.parameters; | |
| } | |
| } else { | |
| // Skip traces without a request body | |
| shouldCreateTrace = false; | |
| } | |
| const traceIndex = shouldCreateTrace ? requestTraces.length : -1; | |
| if (shouldCreateTrace) { | |
| requestTraces.push(traceEntry); | |
| // Check if we need to flush based on batch size | |
| checkAndFlushTraces(); | |
| } | |
| const response = await fetch(targetUrl, { | |
| method: c.req.method, | |
| headers: headers, | |
| body: requestBody, | |
| }); | |
| // Only log if we're not skipping the trace | |
| if (!shouldSkipTrace) { | |
| console.log(`Received response status ${response.status} from ${targetUrl}`); | |
| } | |
| c.status(response.status as StatusCode); | |
| response.headers.forEach((value, key) => { | |
| if (key.toLowerCase() !== 'content-encoding' && key.toLowerCase() !== 'transfer-encoding') { | |
| c.header(key, value); | |
| } | |
| }); | |
| if (!response.headers.has('content-type')) { | |
| c.header('content-type', 'application/octet-stream'); | |
| } | |
| if (response.body) { | |
| const [streamForClient, streamForStorage] = response.body.tee(); | |
| const contentType = response.headers.get('content-type'); | |
| if (shouldCreateTrace && traceIndex >= 0) { | |
| storeStreamedResponse(streamForStorage, contentType, targetUrl, traceIndex).catch(err => { | |
| console.error("Error in background stream storage:", err); | |
| }); | |
| } | |
| return stream(c, async (streamInstance) => { | |
| await streamInstance.pipe(streamForClient); | |
| }); | |
| } else { | |
| // Only log if we're not skipping the trace | |
| if (!shouldSkipTrace) { | |
| console.log(`Received response with no body from ${targetUrl}.`); | |
| } | |
| if (shouldCreateTrace && traceIndex >= 0) { | |
| requestTraces[traceIndex].timestamp_end = new Date().toISOString(); | |
| // Calculate duration if we have both timestamps | |
| if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { | |
| const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); | |
| const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); | |
| requestTraces[traceIndex].duration_ms = endTime - startTime; | |
| } | |
| // Check if we need to flush based on batch size | |
| checkAndFlushTraces(); | |
| } | |
| return c.body(null); | |
| } | |
| } catch (error) { | |
| console.error('Error during proxy request:', error); | |
| return c.text('Internal Server Error', 500); | |
| } | |
| }); | |
| // Ensure we flush any remaining traces when the process is terminating | |
| process.on('SIGINT', () => { | |
| console.log('Process terminating, flushing remaining traces...'); | |
| writeBatchedTraces().then(() => { | |
| process.exit(); | |
| }).catch(err => { | |
| console.error('Error flushing traces on shutdown:', err); | |
| process.exit(1); | |
| }); | |
| }); | |
| process.on('SIGTERM', () => { | |
| console.log('Process terminating, flushing remaining traces...'); | |
| writeBatchedTraces().then(() => { | |
| process.exit(); | |
| }).catch(err => { | |
| console.error('Error flushing traces on shutdown:', err); | |
| process.exit(1); | |
| }); | |
| }); | |
| console.log(`Inference Proxy running on port ${PORT}`); | |
| console.log(`Forwarding to: ${TARGET_BASE_URL}`); | |
| console.log(`Logs directory: ${resolve(LOGS_DIR)}`); | |
| console.log(`Batching: max ${BATCH_SIZE_LIMIT} traces or ${BATCH_TIME_LIMIT} minutes`); | |
| export default { | |
| port: PORT, | |
| fetch: app.fetch, | |
| }; | |