From 08633740f47bf6558c1bc774ff7bf09d6502faeb Mon Sep 17 00:00:00 2001 From: Chrislearn Young Date: Tue, 5 Aug 2025 07:41:35 +0800 Subject: [PATCH] Add mcp-host node and mcp-server node --- Cargo.lock | 1083 +++++++++++++++++--- Cargo.toml | 5 +- examples/mcp-host/README.md | 18 + examples/mcp-host/dataflow.yml | 44 + examples/mcp-host/empty_object.json | 6 + examples/mcp-host/local_object.json | 13 + examples/mcp-host/mcp_host.toml | 51 + examples/mcp-host/mcp_server.toml | 28 + examples/mcp-host/nodes/local.py | 52 + examples/mcp-host/nodes/telepathy.py | 46 + examples/mcp-host/restaurant_object.json | 13 + examples/mcp-host/test_client.py | 102 ++ examples/mcp-server/README.md | 20 + examples/mcp-server/config.toml | 32 + examples/mcp-server/dataflow.yml | 17 + examples/mcp-server/empty_object.json | 6 + examples/mcp-server/nodes/counter.py | 26 + node-hub/dora-mcp-host/Cargo.toml | 44 + node-hub/dora-mcp-host/src/client.rs | 212 ++++ node-hub/dora-mcp-host/src/config.rs | 313 ++++++ node-hub/dora-mcp-host/src/error.rs | 62 ++ node-hub/dora-mcp-host/src/main.rs | 183 ++++ node-hub/dora-mcp-host/src/routing.rs | 118 +++ node-hub/dora-mcp-host/src/session.rs | 221 ++++ node-hub/dora-mcp-host/src/tool.rs | 95 ++ node-hub/dora-mcp-host/src/utils.rs | 14 + node-hub/dora-mcp-server/Cargo.toml | 30 + node-hub/dora-mcp-server/README.md | 66 ++ node-hub/dora-mcp-server/src/config.rs | 102 ++ node-hub/dora-mcp-server/src/error.rs | 57 ++ node-hub/dora-mcp-server/src/main.rs | 176 ++++ node-hub/dora-mcp-server/src/mcp_server.rs | 129 +++ node-hub/dora-mcp-server/src/routing.rs | 71 ++ node-hub/dora-mcp-server/src/utils.rs | 3 + 34 files changed, 3312 insertions(+), 146 deletions(-) create mode 100644 examples/mcp-host/README.md create mode 100644 examples/mcp-host/dataflow.yml create mode 100644 examples/mcp-host/empty_object.json create mode 100644 examples/mcp-host/local_object.json create mode 100644 examples/mcp-host/mcp_host.toml create mode 100644 examples/mcp-host/mcp_server.toml create mode 100644 examples/mcp-host/nodes/local.py create mode 100644 examples/mcp-host/nodes/telepathy.py create mode 100644 examples/mcp-host/restaurant_object.json create mode 100644 examples/mcp-host/test_client.py create mode 100644 examples/mcp-server/README.md create mode 100644 examples/mcp-server/config.toml create mode 100644 examples/mcp-server/dataflow.yml create mode 100644 examples/mcp-server/empty_object.json create mode 100644 examples/mcp-server/nodes/counter.py create mode 100644 node-hub/dora-mcp-host/Cargo.toml create mode 100644 node-hub/dora-mcp-host/src/client.rs create mode 100644 node-hub/dora-mcp-host/src/config.rs create mode 100644 node-hub/dora-mcp-host/src/error.rs create mode 100644 node-hub/dora-mcp-host/src/main.rs create mode 100644 node-hub/dora-mcp-host/src/routing.rs create mode 100644 node-hub/dora-mcp-host/src/session.rs create mode 100644 node-hub/dora-mcp-host/src/tool.rs create mode 100644 node-hub/dora-mcp-host/src/utils.rs create mode 100644 node-hub/dora-mcp-server/Cargo.toml create mode 100644 node-hub/dora-mcp-server/README.md create mode 100644 node-hub/dora-mcp-server/src/config.rs create mode 100644 node-hub/dora-mcp-server/src/error.rs create mode 100644 node-hub/dora-mcp-server/src/main.rs create mode 100644 node-hub/dora-mcp-server/src/mcp_server.rs create mode 100644 node-hub/dora-mcp-server/src/routing.rs create mode 100644 node-hub/dora-mcp-server/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 3d46e6c7..4111beb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -552,7 +552,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.8.0", + "indexmap 2.10.0", "lexical-core", "memchr", "num", @@ -1026,6 +1026,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89cbf775b137e9b968e67227ef7f775587cde3fd31b0d8599dbd0f598a48340" +dependencies = [ + "bytemuck", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1225,6 +1234,20 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.15", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -1829,7 +1852,7 @@ version = "0.13.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe45e18904af7af10e4312df7c97251e98af98c70f42f1f2587aecfcbee56bf" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "lazy_static", "num-traits", "regex", @@ -2255,7 +2278,7 @@ dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.4", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "libc", ] @@ -2576,6 +2599,16 @@ dependencies = [ "darling_macro 0.20.10", ] +[[package]] +name = "darling" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6b136475da5ef7b6ac596c0e956e37bad51b85b987ff3d5e230e964936736b2" +dependencies = [ + "darling_core 0.21.1", + "darling_macro 0.21.1", +] + [[package]] name = "darling_core" version = "0.11.0" @@ -2604,6 +2637,20 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "darling_core" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b44ad32f92b75fb438b04b68547e521a548be8acc339a6dacc4a7121488f53e6" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.101", +] + [[package]] name = "darling_macro" version = "0.11.0" @@ -2626,6 +2673,17 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "darling_macro" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b5be8a7a562d315a5b92a630c30cec6bcf663e6673f00fbb69cca66a6f521b9" +dependencies = [ + "darling_core 0.21.1", + "quote", + "syn 2.0.101", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -3119,6 +3177,54 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "dora-mcp-host" +version = "0.3.12" +dependencies = [ + "chrono", + "dora-node-api", + "eyre", + "figment", + "futures", + "indexmap 2.10.0", + "mime_guess", + "outfox-openai", + "reqwest", + "rmcp 0.3.2", + "salvo 0.81.0", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tracing", + "url", + "uuid 1.16.0", +] + +[[package]] +name = "dora-mcp-server" +version = "0.3.12" +dependencies = [ + "chrono", + "dora-node-api", + "eyre", + "figment", + "futures", + "indexmap 2.10.0", + "mime_guess", + "rmcp 0.2.1", + "salvo 0.80.0", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tracing", + "url", + "uuid 1.16.0", +] + [[package]] name = "dora-message" version = "0.5.0" @@ -3254,7 +3360,7 @@ dependencies = [ "eyre", "futures", "hyper 0.14.32", - "indexmap 2.8.0", + "indexmap 2.10.0", "mime_guess", "serde", "serde_json", @@ -4037,6 +4143,16 @@ dependencies = [ "cc", ] +[[package]] +name = "etag" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b3d0661a2ccddc26cba0b834e9b717959ed6fdd76c7129ee159c170a875bf44" +dependencies = [ + "str-buf", + "xxhash-rust", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -4086,6 +4202,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom 7.1.3", + "pin-project-lite", +] + [[package]] name = "exr" version = "1.73.0" @@ -4194,6 +4321,22 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "figment" +version = "0.10.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3" +dependencies = [ + "atomic", + "pear", + "serde", + "serde_json", + "serde_yaml 0.9.34+deprecated", + "toml", + "uncased", + "version_check", +] + [[package]] name = "filetime" version = "0.2.25" @@ -4313,6 +4456,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared 0.1.1", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -4320,7 +4472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared", + "foreign-types-shared 0.3.1", ] [[package]] @@ -4334,6 +4486,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "foreign-types-shared" version = "0.3.1" @@ -5016,7 +5174,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.8.0", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -5035,7 +5193,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.8.0", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -5083,6 +5241,30 @@ dependencies = [ "foldhash", ] +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64 0.22.1", + "bytes", + "headers-core", + "http 1.3.1", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.3.1", +] + [[package]] name = "heck" version = "0.3.3" @@ -5389,7 +5571,9 @@ dependencies = [ "http 1.3.1", "hyper 1.6.0", "hyper-util", + "log", "rustls 0.23.25", + "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", "tokio-rustls", @@ -5410,23 +5594,46 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.10" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.1", "hyper 1.6.0", + "ipnet", + "libc", + "percent-encoding", "pin-project-lite", - "socket2 0.5.8", + "socket2 0.6.0", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -5678,9 +5885,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.8.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -5722,6 +5929,12 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a257582fdcde896fd96463bf2d40eefea0580021c0712a0e2b028b60b47a837a" +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "inotify" version = "0.9.6" @@ -5852,6 +6065,17 @@ version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06432fb54d3be7964ecd3649233cddf80db2832f47fec34c01f65b3d9d774983" +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags 2.9.0", + "cfg-if 1.0.0", + "libc", +] + [[package]] name = "ioctl-rs" version = "0.1.6" @@ -5894,6 +6118,16 @@ dependencies = [ "serde", ] +[[package]] +name = "iri-string" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is-terminal" version = "0.4.16" @@ -6261,9 +6495,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.171" +version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" [[package]] name = "libfuzzer-sys" @@ -6417,7 +6651,7 @@ source = "git+https://github.com/EricLBuehler/llguidance?rev=8d71957#8d7195774a2 dependencies = [ "anyhow", "derivre", - "indexmap 2.8.0", + "indexmap 2.10.0", "regex-syntax 0.8.5", "serde", "serde_json", @@ -6672,7 +6906,7 @@ dependencies = [ "bitflags 2.9.0", "block", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "log", "objc", "paste", @@ -6687,7 +6921,7 @@ dependencies = [ "bitflags 2.9.0", "block", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "log", "objc", "paste", @@ -6722,6 +6956,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime-infer" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d72eb6076a2d5a9c624f9282d5a4d3428303c7c6671067f8ef812d91a002710" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mime_guess" version = "2.0.5" @@ -6856,7 +7100,7 @@ dependencies = [ "either", "futures", "image", - "indexmap 2.8.0", + "indexmap 2.10.0", "mistralrs-core", "rand 0.9.1", "reqwest", @@ -6895,7 +7139,7 @@ dependencies = [ "half", "hf-hub", "image", - "indexmap 2.8.0", + "indexmap 2.10.0", "indicatif", "interprocess", "itertools 0.13.0", @@ -7012,6 +7256,32 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.3.1", + "httparse", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +dependencies = [ + "serde", +] + [[package]] name = "multiple-daemons-example-node" version = "0.3.12" @@ -7050,7 +7320,7 @@ dependencies = [ "cfg_aliases", "codespan-reporting 0.11.1", "hexf-parse", - "indexmap 2.8.0", + "indexmap 2.10.0", "log", "rustc-hash 1.1.0", "spirv", @@ -7124,6 +7394,23 @@ dependencies = [ "jobserver", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "natord" version = "1.0.9" @@ -7302,6 +7589,18 @@ dependencies = [ "memoffset 0.9.1", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.0", + "cfg-if 1.0.0", + "cfg_aliases", + "libc", +] + [[package]] name = "no-std-net" version = "0.6.0" @@ -7894,25 +8193,51 @@ dependencies = [ ] [[package]] -name = "openssl-probe" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" - -[[package]] -name = "openssl-src" -version = "300.4.2+3.4.1" +name = "openssl" +version = "0.10.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168ce4e058f975fe43e89d9ccf78ca668601887ae736090aacc23ae353c298e2" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" dependencies = [ - "cc", + "bitflags 2.9.0", + "cfg-if 1.0.0", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-src" +version = "300.4.2+3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "168ce4e058f975fe43e89d9ccf78ca668601887ae736090aacc23ae353c298e2" +dependencies = [ + "cc", ] [[package]] name = "openssl-sys" -version = "0.9.106" +version = "0.9.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" dependencies = [ "cc", "libc", @@ -8156,6 +8481,41 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "outfox-openai" +version = "0.1.0" +source = "git+https://github.com/outfox-ai/outfox.git#9c4bb156d60ea11d540cf66f38129de059693f59" +dependencies = [ + "backoff", + "base64 0.22.1", + "bytes", + "derive_builder", + "eventsource-stream", + "futures", + "outfox-openai-macros", + "rand 0.8.5", + "reqwest", + "reqwest-eventsource", + "secrecy 0.10.3", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + +[[package]] +name = "outfox-openai-macros" +version = "0.1.0" +source = "git+https://github.com/outfox-ai/outfox.git#9c4bb156d60ea11d540cf66f38129de059693f59" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "overload" version = "0.1.1" @@ -8266,6 +8626,29 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" +[[package]] +name = "pear" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi", +] + +[[package]] +name = "pear_codegen" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.101", +] + [[package]] name = "peg" version = "0.6.3" @@ -8369,7 +8752,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset 0.4.2", - "indexmap 2.8.0", + "indexmap 2.10.0", ] [[package]] @@ -8525,7 +8908,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d" dependencies = [ "base64 0.22.1", - "indexmap 2.8.0", + "indexmap 2.10.0", "quick-xml 0.32.0", "serde", "time", @@ -8818,6 +9201,33 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", + "version_check", + "yansi", +] + +[[package]] +name = "process-wrap" +version = "8.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3ef4f2f0422f23a82ec9f628ea2acd12871c81a9362b02c43c1aa86acfc3ba1" +dependencies = [ + "futures", + "indexmap 2.10.0", + "nix 0.30.1", + "tokio", + "tracing", + "windows 0.61.1", +] + [[package]] name = "profiling" version = "1.0.16" @@ -9964,7 +10374,7 @@ dependencies = [ "tokio-util", "tonic", "tonic-web", - "tower-http", + "tower-http 0.5.2", ] [[package]] @@ -10984,7 +11394,7 @@ dependencies = [ "half", "home", "image", - "indexmap 2.8.0", + "indexmap 2.10.0", "itertools 0.14.0", "linked-hash-map", "ndarray 0.16.1", @@ -11226,9 +11636,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" [[package]] name = "reqwest" -version = "0.12.15" +version = "0.12.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" +checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531" dependencies = [ "base64 0.22.1", "bytes", @@ -11242,35 +11652,52 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-rustls", + "hyper-tls", "hyper-util", - "ipnet", "js-sys", "log", "mime", - "once_cell", + "mime_guess", + "native-tls", "percent-encoding", "pin-project-lite", "quinn 0.11.7", "rustls 0.23.25", - "rustls-pemfile 2.2.0", + "rustls-native-certs 0.8.1", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower 0.5.2", + "tower-http 0.6.6", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.8", - "windows-registry", + "webpki-roots 1.0.2", +] + +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom 7.1.3", + "pin-project-lite", + "reqwest", + "thiserror 1.0.69", ] [[package]] @@ -11300,7 +11727,7 @@ dependencies = [ "crossbeam", "document-features", "env_filter", - "indexmap 2.8.0", + "indexmap 2.10.0", "indicatif", "itertools 0.14.0", "log", @@ -11428,6 +11855,77 @@ dependencies = [ "crossbeam", ] +[[package]] +name = "rmcp" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37f2048a81a7ff7e8ef6bc5abced70c3d9114c8f03d85d7aaaafd9fd04f12e9e" +dependencies = [ + "base64 0.22.1", + "chrono", + "futures", + "paste", + "pin-project-lite", + "rmcp-macros 0.2.1", + "schemars 0.8.22", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "rmcp" +version = "0.3.2" +source = "git+https://github.com/modelcontextprotocol/rust-sdk.git?rev=fbc7ab7#fbc7ab70cab26fd4f8897e5f88463cd442e7c59d" +dependencies = [ + "base64 0.22.1", + "chrono", + "futures", + "http 1.3.1", + "paste", + "pin-project-lite", + "process-wrap", + "reqwest", + "rmcp-macros 0.3.2", + "schemars 1.0.4", + "serde", + "serde_json", + "sse-stream", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + +[[package]] +name = "rmcp-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72398e694b9f6dbb5de960cf158c8699e6a1854cb5bbaac7de0646b2005763c4" +dependencies = [ + "darling 0.20.10", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.101", +] + +[[package]] +name = "rmcp-macros" +version = "0.3.2" +source = "git+https://github.com/modelcontextprotocol/rust-sdk.git?rev=fbc7ab7#fbc7ab70cab26fd4f8897e5f88463cd442e7c59d" +dependencies = [ + "darling 0.21.1", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.101", +] + [[package]] name = "rmp" version = "0.8.14" @@ -11924,6 +12422,232 @@ dependencies = [ "serde_json", ] +[[package]] +name = "salvo" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ce0331005d85590a43295118391452dd04cf7826c2f8251c2362874b8d8b67a" +dependencies = [ + "salvo-cors 0.80.0", + "salvo_core 0.80.0", + "salvo_extra 0.80.0", +] + +[[package]] +name = "salvo" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cc05fe965153805873da91f7a0a65e69a4173fff562ed553f8e2e6a9ff5bce7" +dependencies = [ + "salvo-cors 0.81.0", + "salvo_core 0.81.0", + "salvo_extra 0.81.0", +] + +[[package]] +name = "salvo-cors" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "307be0c582399615bc81e2b5706647f3cf2f8a98edb294a8344917facb17e648" +dependencies = [ + "bytes", + "salvo_core 0.80.0", + "tracing", +] + +[[package]] +name = "salvo-cors" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c791806c8f21342ec558a9b7b6fe7d2e6a0d9d98e85b487e01804a8d72db0453" +dependencies = [ + "bytes", + "salvo_core 0.81.0", + "tracing", +] + +[[package]] +name = "salvo-serde-util" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2014d193ab04bf917c574c155801279474becd532a02f7dbe64928bdd2b072a4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "salvo-serde-util" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b2cd0a2b0073c85f10eefa3722dbae18e3c2c9a2567f9d840ecc712127e30c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "salvo_core" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb81e2093c75ff8c3273196c7b5e3b2fb9c5f838ef7e925753ce7e58ddbd6fe4" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "enumflags2", + "form_urlencoded", + "futures-channel", + "futures-util", + "headers", + "http 1.3.1", + "http-body-util", + "hyper 1.6.0", + "hyper-rustls", + "hyper-util", + "indexmap 2.10.0", + "mime", + "mime-infer", + "multer", + "multimap", + "nix 0.30.1", + "parking_lot", + "percent-encoding", + "pin-project", + "rand 0.9.1", + "regex", + "rustls-pemfile 2.2.0", + "salvo_macros 0.80.0", + "serde", + "serde-xml-rs 0.8.1", + "serde_json", + "sync_wrapper", + "tempfile", + "thiserror 2.0.12", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", +] + +[[package]] +name = "salvo_core" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0978458bee0102c6c337040ea0b13c497ff1e31015c49c2cc9a387f813e2c1" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "enumflags2", + "form_urlencoded", + "futures-channel", + "futures-util", + "headers", + "http 1.3.1", + "http-body-util", + "hyper 1.6.0", + "hyper-rustls", + "hyper-util", + "indexmap 2.10.0", + "mime", + "mime-infer", + "multer", + "multimap", + "nix 0.30.1", + "parking_lot", + "percent-encoding", + "pin-project", + "rand 0.9.1", + "regex", + "rustls-pemfile 2.2.0", + "salvo_macros 0.81.0", + "serde", + "serde-xml-rs 0.8.1", + "serde_json", + "sync_wrapper", + "tempfile", + "thiserror 2.0.12", + "tokio", + "tokio-rustls", + "tokio-util", + "tracing", +] + +[[package]] +name = "salvo_extra" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f90ca584472adbb3dacf4e82bdf46f8022771916342d598b5521f6bfe6b8bfd" +dependencies = [ + "base64 0.22.1", + "etag", + "futures-util", + "http-body-util", + "hyper 1.6.0", + "pin-project", + "salvo_core 0.80.0", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.27.0", + "tower 0.5.2", + "tracing", + "ulid", +] + +[[package]] +name = "salvo_extra" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02e85aa76fe3213ddf45eb9d58a5dace805c2d655804283e0fd4ba4c6a67811" +dependencies = [ + "base64 0.22.1", + "etag", + "futures-util", + "http-body-util", + "hyper 1.6.0", + "pin-project", + "salvo_core 0.81.0", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.27.0", + "tower 0.5.2", + "tracing", + "ulid", +] + +[[package]] +name = "salvo_macros" +version = "0.80.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83c13353864ea07e0673511a843b762c775b7f9364657ff5c49ad5fccc5ce0a0" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "salvo-serde-util 0.80.0", + "syn 2.0.101", +] + +[[package]] +name = "salvo_macros" +version = "0.81.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b74585140d4b656e51bd95e985c7acb8e57fe3d25f2dbb7c2b21fec30a8a91ef" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "salvo-serde-util 0.81.0", + "syn 2.0.101", +] + [[package]] name = "same-file" version = "1.0.6" @@ -11954,6 +12678,7 @@ version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" dependencies = [ + "chrono", "dyn-clone", "either", "schemars_derive 0.8.22", @@ -11967,6 +12692,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ + "chrono", "dyn-clone", "ref-cast", "schemars_derive 1.0.4", @@ -12049,6 +12775,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "secrecy" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -12186,6 +12922,18 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "serde-xml-rs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53630160a98edebde0123eb4dfd0fce6adff091b2305db3154a9e920206eb510" +dependencies = [ + "log", + "serde", + "thiserror 1.0.69", + "xml-rs", +] + [[package]] name = "serde_assert" version = "0.7.1" @@ -12232,7 +12980,7 @@ version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "itoa", "memchr", "ryu", @@ -12290,7 +13038,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_derive", "serde_json", @@ -12328,7 +13076,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "itoa", "ryu", "serde", @@ -12759,6 +13507,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "socketpair" version = "0.19.6" @@ -12872,6 +13630,19 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "sse-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4dc4d33c68ec1f27d386b5610a351922656e1fdf5c05bbaad930cd1519479a" +dependencies = [ + "bytes", + "futures-util", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", +] + [[package]] name = "ssri" version = "9.2.0" @@ -12954,6 +13725,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "str-buf" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ceb97b7225c713c2fd4db0153cb6b3cab244eb37900c3f634ed4d43310d8c34" + [[package]] name = "strict-num" version = "0.1.1" @@ -13259,9 +14036,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand 2.3.0", "getrandom 0.3.2", @@ -13601,20 +14378,22 @@ dependencies = [ [[package]] name = "tokio" -version = "1.44.2" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio 1.0.3", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.8", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -13628,6 +14407,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rayon" version = "2.1.0" @@ -13669,7 +14458,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.24.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.27.0", ] [[package]] @@ -13718,7 +14519,7 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd87a5cdd6ffab733b2f74bc4fd7ee5fff6634124999ac278c35fc78c6120148" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -13740,7 +14541,7 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -13794,7 +14595,7 @@ dependencies = [ "pin-project", "tokio-stream", "tonic", - "tower-http", + "tower-http 0.5.2", "tower-layer", "tower-service", "tracing", @@ -13856,8 +14657,10 @@ dependencies = [ "pin-project-lite", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -13876,6 +14679,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags 2.9.0", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "iri-string", + "pin-project-lite", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -14029,6 +14850,19 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" +dependencies = [ + "bytes", + "log", + "rand 0.9.1", + "thiserror 2.0.12", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -14100,6 +14934,25 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.1", + "web-time", +] + +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unchecked-index" version = "0.2.2" @@ -14226,7 +15079,7 @@ dependencies = [ "quick-xml 0.36.2", "regex", "serde", - "serde-xml-rs", + "serde-xml-rs 0.6.0", "thiserror 1.0.69", ] @@ -14819,6 +15672,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "weezl" version = "0.1.8" @@ -14862,7 +15724,7 @@ dependencies = [ "bitflags 2.9.0", "cfg_aliases", "document-features", - "indexmap 2.8.0", + "indexmap 2.10.0", "log", "naga", "once_cell", @@ -15235,13 +16097,13 @@ dependencies = [ [[package]] name = "windows-registry" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" +checksum = "ad1da3e436dc7653dfdf3da67332e22bff09bb0e28b0239e1624499c7830842e" dependencies = [ + "windows-link", "windows-result 0.3.2", - "windows-strings 0.3.1", - "windows-targets 0.53.0", + "windows-strings 0.4.0", ] [[package]] @@ -15281,15 +16143,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-strings" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87fa48cc5d406560701792be122a10132491cff9d0aeb23583cc2dcafc847319" -dependencies = [ - "windows-link", -] - [[package]] name = "windows-strings" version = "0.4.0" @@ -15389,29 +16242,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" -dependencies = [ - "windows_aarch64_gnullvm 0.53.0", - "windows_aarch64_msvc 0.53.0", - "windows_i686_gnu 0.53.0", - "windows_i686_gnullvm 0.53.0", - "windows_i686_msvc 0.53.0", - "windows_x86_64_gnu 0.53.0", - "windows_x86_64_gnullvm 0.53.0", - "windows_x86_64_msvc 0.53.0", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -15430,12 +16267,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" - [[package]] name = "windows_aarch64_msvc" version = "0.34.0" @@ -15460,12 +16291,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" - [[package]] name = "windows_i686_gnu" version = "0.34.0" @@ -15490,24 +16315,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" - [[package]] name = "windows_i686_msvc" version = "0.34.0" @@ -15532,12 +16345,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" - [[package]] name = "windows_x86_64_gnu" version = "0.34.0" @@ -15562,12 +16369,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" - [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -15586,12 +16387,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" - [[package]] name = "windows_x86_64_msvc" version = "0.34.0" @@ -15616,12 +16411,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" - [[package]] name = "winit" version = "0.30.9" @@ -15888,6 +16677,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yoke" version = "0.7.5" @@ -16259,7 +17054,7 @@ checksum = "decb0881ecbd88bc8eb68d0440ff96c1b77548bced3c58ea9e14fe2a5c554162" dependencies = [ "json5", "num_cpus", - "secrecy", + "secrecy 0.8.0", "serde", "serde_json", "serde_with", @@ -16460,7 +17255,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "rustls-webpki 0.102.8", - "secrecy", + "secrecy 0.8.0", "time", "tokio", "tokio-util", @@ -16544,7 +17339,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "rustls-webpki 0.102.8", - "secrecy", + "secrecy 0.8.0", "socket2 0.5.8", "time", "tls-listener", @@ -16646,7 +17441,7 @@ dependencies = [ "async-trait", "futures-util", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tokio-util", "tracing", "url", @@ -17040,7 +17835,7 @@ dependencies = [ "crc32fast", "crossbeam-utils", "displaydoc", - "indexmap 2.8.0", + "indexmap 2.10.0", "num_enum", "thiserror 1.0.69", ] @@ -17055,7 +17850,7 @@ dependencies = [ "crc32fast", "crossbeam-utils", "displaydoc", - "indexmap 2.8.0", + "indexmap 2.10.0", "memchr", "thiserror 2.0.12", "time", diff --git a/Cargo.toml b/Cargo.toml index 4067735d..25c72712 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ members = [ "libraries/shared-memory-server", "libraries/extensions/download", "libraries/extensions/telemetry/*", + "node-hub/dora-mcp-host", + "node-hub/dora-mcp-server", "node-hub/dora-record", "node-hub/dora-rerun", "node-hub/terminal-print", @@ -48,7 +50,7 @@ members = [ ] [workspace.package] -edition = "2024" +edition = "2021" rust-version = "1.85.0" # Make sure to also bump `apis/node/python/__init__.py` version. version = "0.3.12" @@ -104,7 +106,6 @@ edition.workspace = true license = "Apache-2.0" publish = false - [features] # enables examples that depend on a sourced ROS2 installation ros2-examples = [] diff --git a/examples/mcp-host/README.md b/examples/mcp-host/README.md new file mode 100644 index 00000000..d7825770 --- /dev/null +++ b/examples/mcp-host/README.md @@ -0,0 +1,18 @@ +# Dora Openai MCP Host Example + +This is a quick example to showcase how use the `dora-openai-server` to receive and send back data. + +Dora Openai Server is still experimental and may change in the future. + +Make sure to have, dora, uv and cargo installed. + +```bash +uv venv -p 3.11 --seed +uv pip install -e ../../apis/python/node --reinstall +dora build dataflow.yml --uv +dora run dataflow.yml --uv + +# In a separate terminal +uv run test_client.py +dora stop +``` diff --git a/examples/mcp-host/dataflow.yml b/examples/mcp-host/dataflow.yml new file mode 100644 index 00000000..3b31d62e --- /dev/null +++ b/examples/mcp-host/dataflow.yml @@ -0,0 +1,44 @@ +nodes: + - id: mcp-server + build: cargo build -p dora-mcp-server --release + path: ../../target/release/dora-mcp-server + outputs: + - local + - telepathy + inputs: + local_reply: local/reply + telepathy_reply: telepathy/reply + env: + CONFIG: mcp_server.toml + + - id: local + path: nodes/local.py + inputs: + text: mcp-server/local + outputs: + - reply + + - id: telepathy + path: nodes/telepathy.py + inputs: + text: mcp-server/telepathy + outputs: + - reply + + - id: dora-echo + build: pip install -e ../../node-hub/dora-echo + path: dora-echo + inputs: + echo: dora-mcp-host/text + outputs: + - echo + + - id: dora-mcp-host + build: cargo build -p dora-mcp-host --release + path: ../../target/release/dora-mcp-host + outputs: + - text + inputs: + text: dora-echo/echo + env: + CONFIG: mcp_host.toml \ No newline at end of file diff --git a/examples/mcp-host/empty_object.json b/examples/mcp-host/empty_object.json new file mode 100644 index 00000000..d217dbf7 --- /dev/null +++ b/examples/mcp-host/empty_object.json @@ -0,0 +1,6 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + } +} \ No newline at end of file diff --git a/examples/mcp-host/local_object.json b/examples/mcp-host/local_object.json new file mode 100644 index 00000000..f473ca9f --- /dev/null +++ b/examples/mcp-host/local_object.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "location" + } + }, + "required": [ + "location" + ] +} \ No newline at end of file diff --git a/examples/mcp-host/mcp_host.toml b/examples/mcp-host/mcp_host.toml new file mode 100644 index 00000000..d6ff845b --- /dev/null +++ b/examples/mcp-host/mcp_host.toml @@ -0,0 +1,51 @@ +listen_addr = "0.0.0.0:8118" +endpoint = "v1" + +[[providers]] +id = "moonshot" +kind ="openai" +api_key = "env:MOONSHOT_API_KEY" +api_url = "https://api.moonshot.cn/v1" + +[[providers]] +id = "gemini" +kind ="gemini" +# api_key = "env:GEMINI_API_KEY" +api_url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent" + +# [[providers]] +# id = "deepseek" +# kind ="deepseek" +# # api_key = "env:DEEPSEEK_API_KEY" +# api_url = "https://api.deepseek.com" + +# [[providers]] +# id = "openai" +# # api_key = "env:OPENAI_API_KEY" +# api_url = "https://api.openai.com/v1" + +[[providers]] +id = "dora" +kind ="dora" +output = "output" + +[[models]] +id = "kimi-latest" +# default = true +route = { provider = "moonshot", model = "kimi-latest" } + +[[models]] +id = "gemini-2.0-flash" +route = { provider = "gemini", model = "gemini-2.0-flash" } + +[[mcp.servers]] +name = "amap-maps" +protocol = "stdio" +command = "npx" +args = ["-y", "@amap/amap-maps-mcp-server"] +envs = {AMAP_MAPS_API_KEY = "your_amap_maps_api_key_here"} + +[[mcp.servers]] +name = "local" +protocol = "streamable" +url = "http://127.0.0.1:8228/mcp" \ No newline at end of file diff --git a/examples/mcp-host/mcp_server.toml b/examples/mcp-host/mcp_server.toml new file mode 100644 index 00000000..7aa14b84 --- /dev/null +++ b/examples/mcp-host/mcp_server.toml @@ -0,0 +1,28 @@ +name = "MCP Server Example" +version = "0.1.0" + +# You can set your custom listen address and endpoint here. +# Default listen address is "0.0.0.0:8008" and endpoint is "mcp". +listen_addr = "0.0.0.0:8228" +endpoint = "mcp" + +[[mcp_tools]] +name = "signature_dish" +description = "Tell you the name of the most signature dish in a certain restaurant." +args = [] +input_schema = "restaurant_object.json" +output = "local" + +[[mcp_tools]] +name = "happiest_kindergarten" +description = "Tell you the happiest kindergarten in this location and how many children are in this kindergarten." +args = [] +input_schema = "local_object.json" +output = "local" + +[[mcp_tools]] +name = "telepathy" +description = "Know who the current user's favorite star is and what their favorite movie is." +args = [] +input_schema = "empty_object.json" +output = "telepathy" diff --git a/examples/mcp-host/nodes/local.py b/examples/mcp-host/nodes/local.py new file mode 100644 index 00000000..5fa0f02c --- /dev/null +++ b/examples/mcp-host/nodes/local.py @@ -0,0 +1,52 @@ +""" +This is just a simple demonstration of an MCP server. + +The example returns some local information about the user's request, such as the tallest building, +the happiest kindergarten, the best restaurant, etc. +""" + +import pyarrow as pa +from dora import Node +import json + +import random + +signature_dishes = [ + "Kung Pao Chicken", + "Mapo Tofu", + "Twice Cooked Pork", + "Sweet and Sour Pork", + "Boiled Fish in Chili Oil", + "Peking Duck", + "Xiaolongbao", + "Red Braised Pork", + "Fish-Flavored Shredded Pork", + "Dongpo Pork", + "White Cut Chicken", + "Steamed Egg Custard", + "Fish with Pickled Cabbage", + "Saliva Chicken", + "Spicy Beef and Ox Tongue", + "Laziji (Spicy Diced Chicken)", + "Steamed Sea Bass", + "Ants Climbing a Tree", + "Beggar's Chicken", + "Buddha Jumps Over the Wall" +] + +node = Node() + +for event in node: + if event["type"] == "INPUT": + if 'metadata' in event: + data = json.loads(event["value"][0].as_py()) + name = data.get("name", "") + location = data.get("arguments", {}).get("location", "") + match name: + case "signature_dish": + random_dish = random.choice(signature_dishes) + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{{\\"signature_dish\\": \\"{random_dish}\\"}}"}}]}}']), metadata=event["metadata"]) + case "happiest_kindergarten": + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{{\\"kindergarten\\":\\"Golden Sun Kindergarten\\", \\"children\\": 300}}"}}]}}']), metadata=event["metadata"]) + case _: + print(f"Unknown command: {name}") \ No newline at end of file diff --git a/examples/mcp-host/nodes/telepathy.py b/examples/mcp-host/nodes/telepathy.py new file mode 100644 index 00000000..669cfdbe --- /dev/null +++ b/examples/mcp-host/nodes/telepathy.py @@ -0,0 +1,46 @@ +""" +This is just a simple demonstration of an MCP server. + +This MCP server has the ability of telepathy and can know who the current +user's favorite star is and what their favorite movie is. +""" + +import pyarrow as pa +from dora import Node +import json +import random + +star_movie_pairs = [ + {"star": "Tom Hanks", "movie": "Forrest Gump"}, + {"star": "Leonardo DiCaprio", "movie": "Titanic"}, + {"star": "Will Smith", "movie": "Men in Black"}, + {"star": "Robert Downey Jr.", "movie": "Iron Man"}, + {"star": "Johnny Depp", "movie": "Pirates of the Caribbean"}, + {"star": "Brad Pitt", "movie": "Fight Club"}, + {"star": "Angelina Jolie", "movie": "Maleficent"}, + {"star": "Scarlett Johansson", "movie": "Black Widow"}, + {"star": "Chris Evans", "movie": "Captain America"}, + {"star": "Ryan Reynolds", "movie": "Deadpool"}, + {"star": "Emma Stone", "movie": "La La Land"}, + {"star": "Jennifer Lawrence", "movie": "The Hunger Games"}, + {"star": "Morgan Freeman", "movie": "The Shawshank Redemption"}, + {"star": "Denzel Washington", "movie": "Training Day"}, + {"star": "Matt Damon", "movie": "The Martian"}, +] + +node = Node() + +for event in node: + if event["type"] == "INPUT": + if 'metadata' in event: + data = json.loads(event["value"][0].as_py()) + name = data.get("name", "") + location = data.get("arguments", {}).get("location", "") + match name: + case "telepathy": + random_pair = random.choice(star_movie_pairs) + star = random_pair["star"] + movie = random_pair["movie"] + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{{\\"star\\":\\"{star}\\", \\"movie\\":\\"{movie}\\"}}"}}]}}']), metadata=event["metadata"]) + case _: + print(f"Unknown command: {name}") \ No newline at end of file diff --git a/examples/mcp-host/restaurant_object.json b/examples/mcp-host/restaurant_object.json new file mode 100644 index 00000000..091dd674 --- /dev/null +++ b/examples/mcp-host/restaurant_object.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "restaurant": { + "type": "string", + "description": "restaurant name" + } + }, + "required": [ + "restaurant" + ] +} \ No newline at end of file diff --git a/examples/mcp-host/test_client.py b/examples/mcp-host/test_client.py new file mode 100644 index 00000000..dbdfeea9 --- /dev/null +++ b/examples/mcp-host/test_client.py @@ -0,0 +1,102 @@ +"""TODO: Add docstring.""" +from openai import OpenAI +import httpx + +transport = httpx.HTTPTransport(proxy=None) +http_client = httpx.Client(transport=transport) +client = OpenAI(base_url="http://127.0.0.1:8118/v1", api_key="dummy_api_key", http_client=http_client) + +def test_list_models(): + """TODO: Add docstring.""" + try: + models = client.models.list() + print("Available models:") + for model in models.data: + print(f"- {model.id}") + except Exception as e: + print(f"Error listing models: {e}") + + +def test_chat_completion(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="kimi-latest", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": user_input}, + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +def test_chat_completion_image_url(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="kimi-latest", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +def test_chat_completion_image_base64(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAAApgAAAKYB3X3/OAAAABl0RVh0U29mdHdhcmUAd3d3Lmlua3NjYXBlLm9yZ5vuPBoAAANCSURBVEiJtZZPbBtFFMZ/M7ubXdtdb1xSFyeilBapySVU8h8OoFaooFSqiihIVIpQBKci6KEg9Q6H9kovIHoCIVQJJCKE1ENFjnAgcaSGC6rEnxBwA04Tx43t2FnvDAfjkNibxgHxnWb2e/u992bee7tCa00YFsffekFY+nUzFtjW0LrvjRXrCDIAaPLlW0nHL0SsZtVoaF98mLrx3pdhOqLtYPHChahZcYYO7KvPFxvRl5XPp1sN3adWiD1ZAqD6XYK1b/dvE5IWryTt2udLFedwc1+9kLp+vbbpoDh+6TklxBeAi9TL0taeWpdmZzQDry0AcO+jQ12RyohqqoYoo8RDwJrU+qXkjWtfi8Xxt58BdQuwQs9qC/afLwCw8tnQbqYAPsgxE1S6F3EAIXux2oQFKm0ihMsOF71dHYx+f3NND68ghCu1YIoePPQN1pGRABkJ6Bus96CutRZMydTl+TvuiRW1m3n0eDl0vRPcEysqdXn+jsQPsrHMquGeXEaY4Yk4wxWcY5V/9scqOMOVUFthatyTy8QyqwZ+kDURKoMWxNKr2EeqVKcTNOajqKoBgOE28U4tdQl5p5bwCw7BWquaZSzAPlwjlithJtp3pTImSqQRrb2Z8PHGigD4RZuNX6JYj6wj7O4TFLbCO/Mn/m8R+h6rYSUb3ekokRY6f/YukArN979jcW+V/S8g0eT/N3VN3kTqWbQ428m9/8k0P/1aIhF36PccEl6EhOcAUCrXKZXXWS3XKd2vc/TRBG9O5ELC17MmWubD2nKhUKZa26Ba2+D3P+4/MNCFwg59oWVeYhkzgN/JDR8deKBoD7Y+ljEjGZ0sosXVTvbc6RHirr2reNy1OXd6pJsQ+gqjk8VWFYmHrwBzW/n+uMPFiRwHB2I7ih8ciHFxIkd/3Omk5tCDV1t+2nNu5sxxpDFNx+huNhVT3/zMDz8usXC3ddaHBj1GHj/As08fwTS7Kt1HBTmyN29vdwAw+/wbwLVOJ3uAD1wi/dUH7Qei66PfyuRj4Ik9is+hglfbkbfR3cnZm7chlUWLdwmprtCohX4HUtlOcQjLYCu+fzGJH2QRKvP3UNz8bWk1qMxjGTOMThZ3kvgLI5AzFfo379UAAAAASUVORK5CYII=" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +if __name__ == "__main__": + print("Testing API endpoints...") + # test_list_models() + print("\n" + "=" * 50 + "\n") + + while True: + chat_input = input("Enter a message for chat completion: ") + test_chat_completion(chat_input) + + print("\n" + "=" * 50 + "\n") + + # test_chat_completion_image_url(chat_input) + # print("\n" + "=" * 50 + "\n") + # test_chat_completion_image_base64(chat_input) + # print("\n" + "=" * 50 + "\n") diff --git a/examples/mcp-server/README.md b/examples/mcp-server/README.md new file mode 100644 index 00000000..cc0a4416 --- /dev/null +++ b/examples/mcp-server/README.md @@ -0,0 +1,20 @@ +# Dora MCP Server Example + +This is a quick example to showcase how use the `dora-mcp-server` to receive and send back data. + +Dora MCP Server is still experimental and may change in the future. + +Make sure to have, dora, uv and cargo installed. + +```bash +uv venv -p 3.11 --seed +uv pip install -e ../../apis/python/node --reinstall +dora build dataflow.yml --uv +dora run dataflow.yml --uv +``` + +You can use mpc inspector to test: + +```bash +npx @modelcontextprotocol/inspector +``` \ No newline at end of file diff --git a/examples/mcp-server/config.toml b/examples/mcp-server/config.toml new file mode 100644 index 00000000..c264c37b --- /dev/null +++ b/examples/mcp-server/config.toml @@ -0,0 +1,32 @@ +name = "MCP Server Example" +version = "0.1.0" + +# You can set your custom listen address and endpoint here. +# Default listen address is "0.0.0.0:8008" and endpoint is "mcp". +# In this example, the final service url is: http://0.0.0.0:8181/mcp +listen_addr = "0.0.0.0:8181" +endpoint = "mcp" + +[[mcp_tools]] +name = "counter_decrement" # (Required) type: String, Unique identifier for the tool +title = "Decrement Counter" # (Optional) type: String, Human-readable name of the tool for display purposes +input_schema = "empty_object.json" # (Required) JSON Schema defining expected parameters +output = "counter" # (Required) type: String, Set the output name +[mcp_tools.annotations] # (Optional) Additional properties describing a Tool to clients +title = "decrement current value of the counter" # type: String, A human-readable title for the tool + +[[mcp_tools]] +name = "counter_increment" +title = "Increment Counter" +input_schema = "empty_object.json" +output = "counter" +[mcp_tools.annotations] +title = "Increment current value of the counter" + +[[mcp_tools]] +name = "counter_get_value" +title = "Get Counter Value" +input_schema = "empty_object.json" +output = "counter" +[mcp_tools.annotations] +title = "Get the current value of the counter" \ No newline at end of file diff --git a/examples/mcp-server/dataflow.yml b/examples/mcp-server/dataflow.yml new file mode 100644 index 00000000..14e2a09e --- /dev/null +++ b/examples/mcp-server/dataflow.yml @@ -0,0 +1,17 @@ +nodes: + - id: mcp-server + build: cargo build -p dora-mcp-server --release + path: ../../target/release/dora-mcp-server + outputs: + - counter + inputs: + counter_reply: counter/reply + env: + CONFIG: config.toml + + - id: counter + path: nodes/counter.py + inputs: + text: mcp-server/counter + outputs: + - reply \ No newline at end of file diff --git a/examples/mcp-server/empty_object.json b/examples/mcp-server/empty_object.json new file mode 100644 index 00000000..8127cade --- /dev/null +++ b/examples/mcp-server/empty_object.json @@ -0,0 +1,6 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "title": "EmptyObject", + "description": "Input parameters for the counter tool" +} \ No newline at end of file diff --git a/examples/mcp-server/nodes/counter.py b/examples/mcp-server/nodes/counter.py new file mode 100644 index 00000000..4c8fc95e --- /dev/null +++ b/examples/mcp-server/nodes/counter.py @@ -0,0 +1,26 @@ +"""TODO: Add docstring.""" + +import pyarrow as pa +from dora import Node +import json + + +node = Node() + +count = 0 +for event in node: + if event["type"] == "INPUT": + if 'metadata' in event: + data = json.loads(event["value"][0].as_py()) + name = data.get("name", "") + match name: + case "counter_increment": + count += 1 + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{count}"}}]}}']), metadata=event["metadata"]) + case "counter_decrement": + count -= 1 + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{count}"}}]}}']), metadata=event["metadata"]) + case "counter_get_value": + node.send_output("reply", pa.array([f'{{"content":[{{"type": "text", "text": "{count}"}}]}}']), metadata=event["metadata"]) + case _: + print(f"Unknown command: {name}") \ No newline at end of file diff --git a/node-hub/dora-mcp-host/Cargo.toml b/node-hub/dora-mcp-host/Cargo.toml new file mode 100644 index 00000000..309d2715 --- /dev/null +++ b/node-hub/dora-mcp-host/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "dora-mcp-host" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +documentation.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.31" +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +figment = { version = "0.10.0", features = ["env", "json", "toml", "yaml"] } +futures = "0.3.31" +indexmap = { version = "2.6.0", features = ["serde"] } +mime_guess = "2.0.4" +outfox-openai = { version = "0.1.0", git = "https://github.com/outfox-ai/outfox.git" } +reqwest = { version = "0.12.22" } +rmcp = { version = "0.3.2", git = "https://github.com/modelcontextprotocol/rust-sdk.git", rev = "fbc7ab7", features = [ + "client", + "transport-child-process", + "transport-sse-client", + "transport-streamable-http-client", + "reqwest", +] } +salvo = { version = "0.81.0", default-features = false, features = [ + "affix-state", + "cors", + "server", + "http1", + "http2", +] } +serde = { version = "1.0.130", features = ["derive"] } +serde_json = "1.0.68" +thiserror = "2.0.12" +tokio = { version = "1.46.1", features = ["full"] } +tokio-stream = "0.1.11" +tracing = "0.1.27" +url = "2.2.2" +uuid = { version = "1.10", features = ["v4"] } diff --git a/node-hub/dora-mcp-host/src/client.rs b/node-hub/dora-mcp-host/src/client.rs new file mode 100644 index 00000000..4e37a9e2 --- /dev/null +++ b/node-hub/dora-mcp-host/src/client.rs @@ -0,0 +1,212 @@ +use tokio::sync::mpsc; + +use eyre::{eyre, Result}; +use futures::channel::oneshot; +use outfox_openai::spec::{CreateChatCompletionRequest, CreateChatCompletionResponse}; +use reqwest::Client as HttpClient; +use salvo::async_trait; + +use crate::config::{DeepseekConfig, DoraConfig, GeminiConfig, OpenaiConfig}; +use crate::{utils::get_env_or_value, ServerEvent}; + +#[async_trait] +pub trait ChatClient: Send + Sync { + async fn complete( + &self, + request: CreateChatCompletionRequest, + ) -> Result; +} + +#[derive(Debug)] +pub struct GeminiClient { + api_key: String, + api_url: String, + client: HttpClient, +} + +impl GeminiClient { + pub fn new(config: &GeminiConfig) -> Self { + let client = if config.proxy { + HttpClient::new() + } else { + HttpClient::builder() + .no_proxy() + .build() + .unwrap_or_else(|_| HttpClient::new()) + }; + + Self { + api_key: get_env_or_value(&config.api_key), + api_url: get_env_or_value(&config.api_url), + client, + } + } +} + +#[async_trait] +impl ChatClient for GeminiClient { + async fn complete( + &self, + request: CreateChatCompletionRequest, + ) -> Result { + let response = self + .client + .post(&self.api_url) + .header("X-goog-api-key", self.api_key.clone()) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + return Err(eyre!("API Error: {}", error_text)); + } + let text_data = response.text().await?; + println!("Received response: {}", text_data); + let completion: CreateChatCompletionResponse = serde_json::from_str(&text_data) + .map_err(eyre::Report::from) + .unwrap(); + Ok(completion) + } +} + +#[derive(Debug)] +pub struct DeepseekClient { + api_key: String, + api_url: String, + client: HttpClient, +} + +impl DeepseekClient { + pub fn new(config: &DeepseekConfig) -> Self { + let client = if config.proxy { + HttpClient::new() + } else { + HttpClient::builder() + .no_proxy() + .build() + .unwrap_or_else(|_| HttpClient::new()) + }; + + Self { + api_key: get_env_or_value(&config.api_key), + api_url: get_env_or_value(&config.api_url), + client, + } + } +} + +#[async_trait] +impl ChatClient for DeepseekClient { + async fn complete( + &self, + request: CreateChatCompletionRequest, + ) -> Result { + let response = self + .client + .post(format!("{}/chat/completions", self.api_url)) + .header("Authorization", format!("Bearer {}", self.api_key)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + return Err(eyre!("API Error: {}", error_text)); + } + let text_data = response.text().await?; + println!("Received response: {}", text_data); + let completion: CreateChatCompletionResponse = + serde_json::from_str(&text_data).map_err(eyre::Report::from)?; + Ok(completion) + } +} + +#[derive(Debug)] +pub struct OpenaiClient { + api_key: String, + api_url: String, + client: HttpClient, +} + +impl OpenaiClient { + pub fn new(config: &OpenaiConfig) -> Self { + let client = if config.proxy { + HttpClient::new() + } else { + HttpClient::builder() + .no_proxy() + .build() + .unwrap_or_else(|_| HttpClient::new()) + }; + + Self { + api_key: get_env_or_value(&config.api_key), + api_url: get_env_or_value(&config.api_url), + client, + } + } +} + +#[async_trait] +impl ChatClient for OpenaiClient { + async fn complete( + &self, + request: CreateChatCompletionRequest, + ) -> Result { + let response = self + .client + .post(format!("{}/chat/completions", self.api_url)) + .header("Authorization", format!("Bearer {}", self.api_key)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + return Err(eyre!("API Error: {}", error_text)); + } + let text_data = response.text().await?; + println!("Received response: {}", text_data); + let completion: CreateChatCompletionResponse = + serde_json::from_str(&text_data).map_err(eyre::Report::from)?; + Ok(completion) + } +} + +#[derive(Debug)] +pub struct DoraClient { + output: String, + event_sender: mpsc::Sender, +} + +impl DoraClient { + pub fn new(config: &DoraConfig, event_sender: mpsc::Sender) -> Self { + Self { + output: config.output.clone(), + event_sender, + } + } +} + +#[async_trait] +impl ChatClient for DoraClient { + async fn complete( + &self, + request: CreateChatCompletionRequest, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.event_sender + .send(ServerEvent::CallNode { + output: self.output.clone(), + request, + reply: tx, + }) + .await?; + rx.await + .map_err(|e| eyre::eyre!("Failed to parse call tool result: {e}")) + } +} diff --git a/node-hub/dora-mcp-host/src/config.rs b/node-hub/dora-mcp-host/src/config.rs new file mode 100644 index 00000000..013ce828 --- /dev/null +++ b/node-hub/dora-mcp-host/src/config.rs @@ -0,0 +1,313 @@ +use std::{ + collections::HashMap, + path::PathBuf, + process::Stdio, + sync::{Arc, OnceLock}, +}; +use tokio::sync::mpsc; + +use figment::providers::{Env, Format, Json, Toml, Yaml}; +use figment::Figment; +use rmcp::{service::RunningService, transport::ConfigureCommandExt, RoleClient, ServiceExt}; +use serde::{Deserialize, Serialize}; + +use crate::client::{ChatClient, DeepseekClient, DoraClient, GeminiClient, OpenaiClient}; +use crate::{ChatSession, ServerEvent, tool::ToolSet}; + +pub static CONFIG: OnceLock = OnceLock::new(); + +pub fn init() { + let config_file = Env::var("CONFIG").unwrap_or("config.toml".into()); + let config_path = PathBuf::from(config_file); + if !config_path.exists() { + eprintln!("Config file not found at: {}", config_path.display()); + std::process::exit(1); + } + + let raw_config = match config_path + .extension() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + { + "yaml" | "yml" => Figment::new().merge(Yaml::file(config_path)), + "json" => Figment::new().merge(Json::file(config_path)), + "toml" => Figment::new().merge(Toml::file(config_path)), + ext => { + eprintln!("unsupport config file format: {ext:?}"); + std::process::exit(1); + } + }; + + let conf = match raw_config.extract::() { + Ok(s) => s, + Err(e) => { + eprintln!("It looks like your config is invalid. The following error occurred: {e}"); + std::process::exit(1); + } + }; + + CONFIG.set(conf).expect("config should be set"); +} +pub fn get() -> &'static Config { + CONFIG.get().unwrap() +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Config { + #[serde(default = "default_listen_addr")] + pub listen_addr: String, + + #[serde(default = "default_endpoint")] + pub endpoint: Option, + + pub providers: Vec, + pub models: Vec, + + pub mcp: Option, + // #[serde(default = "default_true")] + // pub support_tool: bool, +} +fn default_listen_addr() -> String { + "0.0.0.0:8008".to_owned() +} +fn default_endpoint() -> Option { + Some("v1".to_owned()) +} + +// fn default_true() -> bool { +// true +// } + +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum ProviderConfig { + Gemini(GeminiConfig), + Deepseek(DeepseekConfig), + Openai(OpenaiConfig), + Dora(DoraConfig), +} +impl ProviderConfig { + pub fn id(&self) -> &str { + match self { + ProviderConfig::Gemini(config) => &config.id, + ProviderConfig::Deepseek(config) => &config.id, + ProviderConfig::Openai(config) => &config.id, + ProviderConfig::Dora(config) => &config.id, + } + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct GeminiConfig { + pub id: String, + #[serde(default = "default_gemini_api_key")] + pub api_key: String, + #[serde(default = "default_gemini_api_url")] + pub api_url: String, + #[serde(default)] + pub proxy: bool, +} +fn default_gemini_api_key() -> String { + std::env::var("GEMINI_API_KEY").unwrap_or_default() +} +fn default_gemini_api_url() -> String { + std::env::var("GEMINI_API_URL").unwrap_or_else(|_|"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent".to_owned()) +} + +#[derive(Clone, Debug, Deserialize)] +pub struct DeepseekConfig { + pub id: String, + #[serde(default = "default_deepseek_api_key")] + pub api_key: String, + #[serde(default = "default_deepseek_api_url")] + pub api_url: String, + #[serde(default)] + pub proxy: bool, +} +fn default_deepseek_api_key() -> String { + std::env::var("DEEPSEEK_API_KEY").unwrap_or_default() +} +fn default_deepseek_api_url() -> String { + std::env::var("DEEPSEEK_API_URL").unwrap_or_else(|_| "https://api.deepseek.com".to_owned()) +} + +#[derive(Clone, Debug, Deserialize)] +pub struct OpenaiConfig { + pub id: String, + #[serde(default = "default_openai_api_key")] + pub api_key: String, + #[serde(default = "default_openai_api_url")] + pub api_url: String, + #[serde(default)] + pub proxy: bool, +} +fn default_openai_api_key() -> String { + std::env::var("OPENAI_API_KEY").unwrap_or_default() +} +fn default_openai_api_url() -> String { + std::env::var("OPENAI_API_URL") + .unwrap_or_else(|_| "https://api.openai.com/v1/chat/completions".to_owned()) +} + +#[derive(Clone, Debug, Deserialize)] +pub struct DoraConfig { + pub id: String, + pub output: String, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "model", rename_all = "snake_case")] +pub struct ModelConfig { + pub id: String, + pub object: Option, + pub created: Option, + pub owned_by: Option, + + // #[serde(default)] + // pub default: bool, + pub route: ModelRouteConfig, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "route", rename_all = "snake_case")] +pub struct ModelRouteConfig { + pub provider: String, + pub model: Option, +} + +#[derive(Clone, Default, Debug, Deserialize)] +pub struct McpConfig { + #[serde(default)] + pub servers: Vec, +} +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct McpServerConfig { + pub name: String, + #[serde(flatten)] + pub transport: McpServerTransportConfig, +} +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "protocol", rename_all = "snake_case")] +pub enum McpServerTransportConfig { + Streamable { + url: String, + }, + Sse { + url: String, + }, + Stdio { + command: String, + #[serde(default)] + args: Vec, + #[serde(default)] + envs: HashMap, + }, +} + +impl McpServerTransportConfig { + pub async fn start(&self) -> eyre::Result> { + let client = match self { + McpServerTransportConfig::Streamable { url } => { + for _ in 0..5 { + let transport = + rmcp::transport::StreamableHttpClientTransport::from_uri(url.to_string()); + match ().serve(transport).await { + Ok(client) => return Ok(client), + Err(e) => { + println!("failed to start streamable transport: {e}"); + tracing::warn!("failed to start streamable transport: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + } + eyre::bail!("failed to start streamable transport after 5 attempts"); + } + McpServerTransportConfig::Sse { url } => { + let transport = + rmcp::transport::sse_client::SseClientTransport::start(url.to_owned()).await?; + ().serve(transport).await? + } + McpServerTransportConfig::Stdio { + command, + args, + envs, + } => { + let transport = rmcp::transport::TokioChildProcess::new( + tokio::process::Command::new(command).configure(|cmd| { + cmd.args(args) + .envs(envs) + .stderr(Stdio::inherit()) + .stdout(Stdio::inherit()); + }), + )?; + ().serve(transport).await? + } + }; + Ok(client) + } +} + +impl Config { + pub async fn create_mcp_clients( + &self, + ) -> eyre::Result>> { + let mut clients = HashMap::new(); + + if let Some(mcp_config) = &self.mcp { + for server in &mcp_config.servers { + let client = server.transport.start().await?; + clients.insert(server.name.clone(), client); + } + } + + Ok(clients) + } + + fn create_chat_clients( + &self, + server_events_tx: mpsc::Sender, + ) -> HashMap> { + let mut clients: HashMap> = HashMap::new(); + for provider in &self.providers { + let client: Arc = match provider { + ProviderConfig::Gemini(config) => Arc::new(GeminiClient::new(config)), + ProviderConfig::Deepseek(config) => Arc::new(DeepseekClient::new(config)), + ProviderConfig::Openai(config) => Arc::new(OpenaiClient::new(config)), + ProviderConfig::Dora(config) => { + Arc::new(DoraClient::new(config, server_events_tx.clone())) + } + }; + clients.insert(provider.id().to_owned(), client); + } + clients + } + + pub async fn create_session( + &self, + server_events_tx: mpsc::Sender, + ) -> eyre::Result { + let mut tool_set = ToolSet::default(); + + if self.mcp.is_some() { + let mcp_clients = self.create_mcp_clients().await?; + + for (name, client) in mcp_clients.iter() { + tracing::info!("load MCP tool: {name}"); + let server = client.peer().clone(); + let tools = crate::get_mcp_tools(server.clone()).await?; + + for tool in tools { + tool_set.add_tool(tool); + } + } + tool_set.set_clients(mcp_clients); + } + + Ok(ChatSession::new( + self.create_chat_clients(server_events_tx), + get().models.clone(), + tool_set, + )) + } +} diff --git a/node-hub/dora-mcp-host/src/error.rs b/node-hub/dora-mcp-host/src/error.rs new file mode 100644 index 00000000..336a020c --- /dev/null +++ b/node-hub/dora-mcp-host/src/error.rs @@ -0,0 +1,62 @@ +use salvo::async_trait; +use salvo::http::{StatusCode, StatusError}; +use salvo::prelude::{Depot, Request, Response, Writer}; +use thiserror::Error; + +use crate::ServerEvent; + +#[allow(clippy::large_enum_variant)] +#[derive(Error, Debug)] +pub enum AppError { + #[error("public: `{0}`")] + Public(String), + #[error("internal: `{0}`")] + Internal(String), + #[error("salvo internal error: `{0}`")] + Salvo(#[from] ::salvo::Error), + #[error("serde json: `{0}`")] + SerdeJson(#[from] serde_json::error::Error), + #[error("http: `{0}`")] + StatusError(#[from] salvo::http::StatusError), + #[error("http parse: `{0}`")] + HttpParse(#[from] salvo::http::ParseError), + #[error("recv: `{0}`")] + Recv(#[from] tokio::sync::oneshot::error::RecvError), + #[error("send: `{0}`")] + Send(#[from] tokio::sync::mpsc::error::SendError), + #[error("canceled: `{0}`")] + Canceled(#[from] futures::channel::oneshot::Canceled), + #[error("error report: `{0}`")] + ErrReport(#[from] eyre::Report), + // #[error("reqwest: `{0}`")] + // Reqwest(#[from] reqwest::Error), +} + +impl AppError { + pub fn public>(msg: S) -> Self { + Self::Public(msg.into()) + } + + pub fn internal>(msg: S) -> Self { + Self::Internal(msg.into()) + } +} + +#[async_trait] +impl Writer for AppError { + async fn write(mut self, _req: &mut Request, _depot: &mut Depot, res: &mut Response) { + let code = match &self { + AppError::StatusError(e) => e.code, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + res.status_code(code); + let data = match self { + AppError::Salvo(e) => StatusError::internal_server_error().brief(e.to_string()), + AppError::Public(msg) => StatusError::internal_server_error().brief(msg), + AppError::Internal(_msg) => StatusError::internal_server_error(), + AppError::StatusError(e) => e, + e => StatusError::internal_server_error().brief(e.to_string()), + }; + res.render(data); + } +} diff --git a/node-hub/dora-mcp-host/src/main.rs b/node-hub/dora-mcp-host/src/main.rs new file mode 100644 index 00000000..79aaa28b --- /dev/null +++ b/node-hub/dora-mcp-host/src/main.rs @@ -0,0 +1,183 @@ +use std::collections::HashMap; + +use dora_node_api::{ + arrow::array::{AsArray, StringArray}, + dora_core::config::DataId, + merged::{MergeExternalSend, MergedEvent}, + DoraNode, Event, MetadataParameters, Parameter, +}; +use eyre::{Context, ContextCompat}; +use futures::channel::oneshot; +use outfox_openai::spec::{ + ChatChoice, ChatCompletionResponseMessage, CompletionUsage, CreateChatCompletionRequest, + CreateChatCompletionResponse, FinishReason, Role, +}; +use salvo::cors::*; +use salvo::prelude::*; +use tokio::sync::mpsc; + +mod client; +mod error; +mod routing; +mod utils; +use error::AppError; +mod config; +mod session; +use session::ChatSession; +mod tool; +use tool::get_mcp_tools; +use utils::gen_call_id; + +pub type AppResult = Result; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + config::init(); + + let (server_events_tx, server_events_rx) = mpsc::channel(3); + let server_events = tokio_stream::wrappers::ReceiverStream::new(server_events_rx); + + let config = config::get(); + let chat_session = config + .create_session(server_events_tx.clone()) + .await + .context("failed to create chat session")?; + + let mut reply_channels: HashMap< + String, + ( + oneshot::Sender, + u32, + Option, + ), + > = HashMap::new(); + + let acceptor = TcpListener::new(&config.listen_addr).bind().await; + tokio::spawn(async move { + let service = Service::new(routing::root(config.endpoint.clone(), chat_session.into())) + .hoop( + Cors::new() + .allow_origin(AllowOrigin::any()) + .allow_methods(AllowMethods::any()) + .allow_headers(AllowHeaders::any()) + .into_handler(), + ); + Server::new(acceptor).serve(service).await; + if let Err(err) = server_events_tx.send(ServerEvent::Result(Ok(()))).await { + tracing::warn!("server result channel closed: {err}"); + } + }); + + let (mut node, events) = DoraNode::init_from_env()?; + + let merged = events.merge_external_send(server_events); + let events = futures::executor::block_on_stream(merged); + + for event in events { + match event { + MergedEvent::External(event) => match event { + ServerEvent::Result(server_result) => { + server_result.context("server failed")?; + break; + } + ServerEvent::CallNode { + output, + request, + reply, + } => { + let mut metadata = MetadataParameters::default(); + let call_id = gen_call_id(); + metadata.insert("__dora_call_id".into(), Parameter::String(call_id.clone())); + let texts = request + .messages + .iter() + .map(|msg| msg.to_texts().join("\n")) + .collect::>(); + node.send_output( + DataId::from(output), + Default::default(), + StringArray::from(texts), + ) + .context("failed to send dora output")?; + + reply_channels.insert(call_id, (reply, 0_u32, Some(request.model))); + } + }, + MergedEvent::Dora(event) => match event { + Event::Input { id, data, metadata } => { + let Some(Parameter::String(call_id)) = + metadata.parameters.get("__dora_call_id") + else { + tracing::warn!("No call ID found in metadata for id: {}", id); + continue; + }; + let (reply_channel, prompt_tokens, model) = + reply_channels.remove(call_id).context("no reply channel")?; + let data = data.as_string::(); + let data = data.iter().fold("".to_string(), |mut acc, s| { + if let Some(s) = s { + acc.push('\n'); + acc.push_str(s); + } + acc + }); + + let data = CreateChatCompletionResponse { + id: format!("completion-{}", uuid::Uuid::new_v4()), + object: "chat.completion".to_string(), + created: chrono::Utc::now().timestamp() as u32, + model: model.unwrap_or_default(), + usage: Some(CompletionUsage { + prompt_tokens, + completion_tokens: data.len() as u32, + total_tokens: prompt_tokens + data.len() as u32, + prompt_tokens_details: None, + completion_tokens_details: None, + }), + choices: vec![ChatChoice { + index: 0, + message: ChatCompletionResponseMessage { + role: Role::Assistant, + content: Some(data), + tool_calls: None, + audio: None, + refusal: None, + }, + finish_reason: Some(FinishReason::Stop), + logprobs: None, + }], + service_tier: None, + system_fingerprint: None, + }; + + if reply_channel.send(data).is_err() { + tracing::warn!( + "failed to send chat completion reply because channel closed early" + ); + } + } + Event::Stop(_) => { + break; + } + Event::InputClosed { id, .. } => { + tracing::info!("Input channel closed for id: {}", id); + } + event => { + eyre::bail!("unexpected event: {:#?}", event) + } + }, + } + } + + Ok(()) +} + +#[allow(clippy::large_enum_variant)] +pub enum ServerEvent { + Result(eyre::Result<()>), + CallNode { + output: String, + request: CreateChatCompletionRequest, + reply: oneshot::Sender, + }, +} diff --git a/node-hub/dora-mcp-host/src/routing.rs b/node-hub/dora-mcp-host/src/routing.rs new file mode 100644 index 00000000..ee500dfb --- /dev/null +++ b/node-hub/dora-mcp-host/src/routing.rs @@ -0,0 +1,118 @@ +use std::sync::Arc; + +use outfox_openai::spec::{CreateChatCompletionRequest, Model}; +use salvo::prelude::*; + +use crate::session::ChatSession; +use crate::AppResult; + +pub fn root(endpoint: Option, chat_session: Arc) -> Router { + Router::with_hoop(affix_state::inject(chat_session)) + .push( + if let Some(endpoint) = endpoint { + Router::with_path(endpoint) + } else { + Router::new() + } + .push(Router::with_path("chat/completions").post(chat_completions)) + .push(Router::with_path("models").get(list_models)) + .push(Router::with_path("embeddings").get(todo)) + .push(Router::with_path("files").get(todo)) + .push(Router::with_path("chunks").get(todo)) + .push(Router::with_path("info").get(todo)) + .push(Router::with_path("realtime").get(todo)), + ) + .push(Router::with_path("{**path}").get(index)) +} + +#[handler] +async fn todo(res: &mut Response) { + res.render(Text::Plain("TODO")); +} +#[handler] +async fn index(res: &mut Response) { + res.render(Text::Plain("Hello")); +} + +#[handler] +async fn list_models(depot: &mut Depot, res: &mut Response) { + let chat_session = depot + .obtain::>() + .expect("chat session must be exists"); + + let mut models = Vec::new(); + for model in &chat_session.models { + // TODO: fill correct data + models.push(Model { + id: model.id.clone(), + object: model.object.clone().unwrap_or("object".into()), + created: model.created.unwrap_or_default(), + owned_by: model.owned_by.clone().unwrap_or("dora".into()), + }); + } + res.render(Json(models)); +} + +#[handler] +async fn chat_completions( + req: &mut Request, + depot: &mut Depot, + res: &mut Response, +) -> AppResult<()> { + tracing::info!("Handling the coming chat completion request."); + let chat_session = depot + .obtain::>() + .expect("chat session must be exists"); + + tracing::info!("Prepare the chat completion request."); + + let mut chat_request = match req.parse_json::().await { + Ok(chat_requst) => chat_requst, + Err(e) => { + println!( + "parse request error: {e}, payload: {}", + String::from_utf8_lossy(req.payload().await?) + ); + return Err(e.into()); + } + }; + + // check if the user id is provided + if chat_request.user.is_none() { + chat_request.user = Some(crate::utils::gen_chat_id()) + }; + let id = chat_request.user.clone().unwrap(); + + // log user id + tracing::info!("user: {}", chat_request.user.clone().unwrap()); + // let stream = chat_request.stream; + + // let (tx, rx) = oneshot::channel(); + // request_tx + // .send(ServerEvent::CompletionRequest { + // request: chat_request, + // reply: tx, + // }) + // .await?; + + // if let Some(true) = stream { + // // let result = async { + // // let chat_completion_object = rx.await?; + // // Ok::<_, AppError>(serde_json::to_string(&chat_completion_object)?) + // // }; + // let result = chat_session.chat(chat_request).await?; + // let stream = futures::stream::once(result); + + // let _ = res.add_header("Content-Type", "text/event-stream", true); + // let _ = res.add_header("Cache-Control", "no-cache", true); + // let _ = res.add_header("Connection", "keep-alive", true); + // let _ = res.add_header("user", id, true); + // res.stream(stream); + // } else { + let response = chat_session.chat(chat_request).await?; + let _ = res.add_header("user", id, true); + res.render(Json(response)); + // }; + tracing::info!("Send the chat completion response."); + Ok(()) +} diff --git a/node-hub/dora-mcp-host/src/session.rs b/node-hub/dora-mcp-host/src/session.rs new file mode 100644 index 00000000..28bf0d9c --- /dev/null +++ b/node-hub/dora-mcp-host/src/session.rs @@ -0,0 +1,221 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use eyre::Result; +use outfox_openai::spec::{ + ChatCompletionRequestMessage, ChatCompletionRequestUserMessage, + ChatCompletionRequestUserMessageContent, ChatCompletionResponseMessage, ChatCompletionTool, + ChatCompletionToolType, CreateChatCompletionRequest, CreateChatCompletionResponse, + FunctionCall, FunctionObject, +}; + +use crate::client::ChatClient; +use crate::config::ModelConfig; +use crate::tool::ToolSet; + +pub struct ChatSession { + pub chat_clients: HashMap>, + pub models: Vec, + pub tool_set: ToolSet, + pub messages: Mutex>, +} + +impl ChatSession { + pub fn new( + chat_clients: HashMap>, + models: Vec, + tool_set: ToolSet, + ) -> Self { + Self { + chat_clients, + models, + tool_set, + messages: Default::default(), + } + } + + // pub fn default_model(&self) -> Option<&str> { + // let model = self + // .models + // .iter() + // .find(|model| model.default) + // .map(|model| &*model.id); + // if model.is_none() { + // self.models.first().map(|model| &*model.id) + // } else { + // model + // } + // } + + pub fn route(&self, model: &str) -> Option<(Arc, String)> { + let model = self.models.iter().find(|m| m.id == model)?; + let route = &model.route; + let client = self.chat_clients.get(&route.provider)?; + Some(( + client.clone(), + route.model.clone().unwrap_or(model.id.clone()), + )) + } + + // pub fn add_system_prompt(&mut self, prompt: impl ToString) { + // let mut messages = self.messages.lock().expect("messages should locked"); + // messages.push( + // ChatCompletionRequestSystemMessage { + // content: PartibleTextContent::Text(prompt.to_string()), + // name: None, + // } + // .into(), + // ); + // } + + // pub fn get_tools(&self) -> Vec> { + // self.tool_set.tools() + // } + + pub async fn analyze_tool_call(&self, response: &ChatCompletionResponseMessage) { + let mut tool_calls_func = Vec::new(); + if let Some(tool_calls) = response.tool_calls.as_ref() { + for tool_call in tool_calls { + // if tool_call.r#type == "function" { + tool_calls_func.push(tool_call.function.clone()); + // } + } + } else { + // check if message contains tool call + if let Some(text) = &response.content { + if text.contains("Tool:") { + let lines: Vec<&str> = text.split('\n').collect(); + // simple parse tool call + let mut tool_name = None; + let mut args_text = Vec::new(); + let mut parsing_args = false; + + for line in lines { + if line.starts_with("Tool:") { + tool_name = line.strip_prefix("Tool:").map(|s| s.trim().to_string()); + parsing_args = false; + } else if line.starts_with("Inputs:") { + parsing_args = true; + } else if parsing_args { + args_text.push(line.trim()); + } + } + if let Some(name) = tool_name { + tool_calls_func.push(FunctionCall { + name, + arguments: args_text.join("\n"), + }); + } + } + } + } + // call tool + for tool_call in tool_calls_func { + let tool = self.tool_set.get_tool(&tool_call.name); + if let Some(tool) = tool { + // call tool + let args = serde_json::from_str::(&tool_call.arguments) + .unwrap_or_default(); + match tool.call(args).await { + Ok(result) => { + if result.is_error.is_some_and(|b| b) { + let mut messages = + self.messages.lock().expect("messages should locked"); + messages.push( + ChatCompletionRequestUserMessage::new( + "tool call failed, mcp call error", + ) + .into(), + ); + } else if let Some(contents) = &result.content { + contents.iter().for_each(|content| { + if let Some(content_text) = content.as_text() { + let json_result = serde_json::from_str::( + &content_text.text, + ) + .unwrap_or_default(); + let pretty_result = + serde_json::to_string_pretty(&json_result).unwrap(); + tracing::debug!("call tool result: {}", pretty_result); + let mut messages = + self.messages.lock().expect("messages should locked"); + messages.push( + ChatCompletionRequestUserMessage::new(format!( + "call tool result: {pretty_result}" + )) + .into(), + ); + } + }); + } + } + Err(e) => { + tracing::error!("tool call failed: {}", e); + let mut messages = self.messages.lock().expect("messages should locked"); + messages.push( + ChatCompletionRequestUserMessage { + content: ChatCompletionRequestUserMessageContent::Text(format!( + "tool call failed: {e}" + )), + name: None, + } + .into(), + ); + } + } + } else { + println!("tool not found: {}", tool_call.name); + } + } + } + pub async fn chat( + &self, + mut request: CreateChatCompletionRequest, + ) -> Result { + { + let mut messages = self.messages.lock().expect("messages should locked"); + for message in std::mem::take(&mut request.messages) { + messages.push(message); + } + request.messages = messages.clone(); + } + let tools = self.tool_set.tools(); + let tool_definitions = if !tools.is_empty() { + Some( + tools + .iter() + .map(|tool| ChatCompletionTool { + kind: ChatCompletionToolType::Function, + function: FunctionObject { + name: tool.name(), + description: Some(tool.description()), + parameters: Some(tool.parameters()), + strict: None, + }, + }) + .collect(), + ) + } else { + None + }; + + let (client, model) = self.route(&request.model).expect("failed to route model"); + request.model = model.clone(); + request.tools = tool_definitions; + + // send request + let response = client.complete(request).await?; + // get choice + if let Some(choice) = response.choices.first() { + // analyze tool call + self.analyze_tool_call(&choice.message).await; + let request = { + let messages = self.messages.lock().expect("messages should locked"); + CreateChatCompletionRequest::new(model, messages.clone()) + }; + client.complete(request).await + } else { + Ok(response) + } + } +} diff --git a/node-hub/dora-mcp-host/src/tool.rs b/node-hub/dora-mcp-host/src/tool.rs new file mode 100644 index 00000000..6740c7c6 --- /dev/null +++ b/node-hub/dora-mcp-host/src/tool.rs @@ -0,0 +1,95 @@ +use std::{collections::HashMap, sync::Arc}; + +use eyre::Result; +use rmcp::{ + model::{CallToolRequestParam, CallToolResult, Tool as McpTool}, + service::{RunningService, ServerSink}, + RoleClient, +}; +use salvo::async_trait; +use serde_json::Value; + +#[async_trait] +pub trait Tool: Send + Sync { + fn name(&self) -> String; + fn description(&self) -> String; + fn parameters(&self) -> Value; + async fn call(&self, args: Value) -> Result; +} + +pub struct McpToolAdapter { + tool: McpTool, + server: ServerSink, +} + +impl McpToolAdapter { + pub fn new(tool: McpTool, server: ServerSink) -> Self { + Self { tool, server } + } +} + +#[async_trait] +impl Tool for McpToolAdapter { + fn name(&self) -> String { + self.tool.name.clone().to_string() + } + + fn description(&self) -> String { + self.tool + .description + .clone() + .unwrap_or_default() + .to_string() + } + + fn parameters(&self) -> Value { + serde_json::to_value(&self.tool.input_schema).unwrap_or(serde_json::json!({})) + } + + async fn call(&self, args: Value) -> Result { + let arguments = match args { + Value::Object(map) => Some(map), + _ => None, + }; + let call_result = self + .server + .call_tool(CallToolRequestParam { + name: self.tool.name.clone(), + arguments, + }) + .await?; + + Ok(call_result) + } +} +#[derive(Default)] +pub struct ToolSet { + tools: HashMap>, + clients: HashMap>, +} + +impl ToolSet { + pub fn set_clients(&mut self, clients: HashMap>) { + self.clients = clients; + } + + pub fn add_tool(&mut self, tool: T) { + self.tools.insert(tool.name(), Arc::new(tool)); + } + + pub fn get_tool(&self, name: &str) -> Option> { + self.tools.get(name).cloned() + } + + pub fn tools(&self) -> Vec> { + self.tools.values().cloned().collect() + } +} + +pub async fn get_mcp_tools(server: ServerSink) -> Result> { + let tools = server.list_all_tools().await?; + Ok(tools + .into_iter() + .map(|tool| McpToolAdapter::new(tool, server.clone())) + .collect()) +} diff --git a/node-hub/dora-mcp-host/src/utils.rs b/node-hub/dora-mcp-host/src/utils.rs new file mode 100644 index 00000000..60e25ff1 --- /dev/null +++ b/node-hub/dora-mcp-host/src/utils.rs @@ -0,0 +1,14 @@ +pub(crate) fn gen_call_id() -> String { + format!("call-{}", uuid::Uuid::new_v4()) +} +pub(crate) fn gen_chat_id() -> String { + format!("chatcmpl-{}", uuid::Uuid::new_v4()) +} + +pub fn get_env_or_value(value: &str) -> String { + if let Some(stripped) = value.strip_prefix("env:") { + std::env::var(stripped).unwrap_or_else(|_| value.to_string()) + } else { + value.to_string() + } +} diff --git a/node-hub/dora-mcp-server/Cargo.toml b/node-hub/dora-mcp-server/Cargo.toml new file mode 100644 index 00000000..18de0ebf --- /dev/null +++ b/node-hub/dora-mcp-server/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "dora-mcp-server" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +documentation.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.31" +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +futures = "0.3.31" +indexmap = { version = "2.6.0", features = ["serde"] } +mime_guess = "2.0.4" +rmcp = { version = "0.2.1", features = ["server"] } +salvo = { version = "0.80.0", default-features = false, features = ["affix-state", "cors", "server", "http1", "http2"] } +serde = { version = "1.0.130", features = ["derive"] } +serde_json = "1.0.68" +thiserror = "2.0.12" +tokio = { version = "1.46.1", features = ["full"] } +tokio-stream = "0.1.11" +tracing = "0.1.27" +url = "2.2.2" +uuid = { version = "1.10", features = ["v4"] } +figment = { version = "0.10.0", features = ["env", "json", "toml", "yaml"] } diff --git a/node-hub/dora-mcp-server/README.md b/node-hub/dora-mcp-server/README.md new file mode 100644 index 00000000..af5da467 --- /dev/null +++ b/node-hub/dora-mcp-server/README.md @@ -0,0 +1,66 @@ +# Dora MCP Server + +This node can provide an MCP Server, which will proxy the request to one or more other nodes in the dora application. + +Dora MCP Server is still experimental and may change in the future. + + +## How to use + +```yaml +nodes: + - id: mcp-server + build: cargo build -p dora-mcp-server --release + path: ../../target/release/dora-mcp-server + outputs: + - counter + inputs: + counter_reply: counter/reply + env: + MCP_SERVER_CONFIG: config.toml +``` + +use `MCP_SERVER_CONFIG` set config file, it supports toml, json or yaml format. + +An example config file: + +```toml +name = "MCP Server Example" +version = "0.1.0" + +# You can set your custom listen address and endpoint here. +# Default listen address is "0.0.0.0:8008" and endpoint is "mcp". +# In this example, the final service url is: http://0.0.0.0:8181/mcp +listen_addr = "0.0.0.0:8181" +endpoint = "mcp" + +[[mcp_tools]] +name = "counter_decrement" # (Required) type: String, Unique identifier for the tool +title = "Decrement Counter" # (Optional) type: String, Human-readable name of the tool for display purposes +input_schema = "empty_object.json" # (Required) JSON Schema defining expected parameters +output = "counter" # (Required) type: String, Set the output name +[mcp_tools.annotations] # (Optional) Additional properties describing a Tool to clients +title = "decrement current value of the counter" # type: String, A human-readable title for the tool + +[[mcp_tools]] +name = "counter_increment" +title = "Increment Counter" +input_schema = "empty_object.json" +output = "counter" +[mcp_tools.annotations] +title = "Increment current value of the counter" + +[[mcp_tools]] +name = "counter_get_value" +title = "Get Counter Value" +input_schema = "empty_object.json" +output = "counter" +[mcp_tools.annotations] +title = "Get the current value of the counter" +``` + +You can use mpc inspector to test: + +```bash +npx @modelcontextprotocol/inspector +``` \ No newline at end of file diff --git a/node-hub/dora-mcp-server/src/config.rs b/node-hub/dora-mcp-server/src/config.rs new file mode 100644 index 00000000..58648ac9 --- /dev/null +++ b/node-hub/dora-mcp-server/src/config.rs @@ -0,0 +1,102 @@ +use std::path::{Path, PathBuf}; +use std::sync::OnceLock; + +use figment::providers::{Env, Format, Json, Toml, Yaml}; +use figment::Figment; +use rmcp::model::{JsonObject, ToolAnnotations}; +use serde::{Deserialize, Serialize}; + +pub static CONFIG: OnceLock = OnceLock::new(); + +fn figment_from_path>(path: P) -> Figment { + let ext = path + .as_ref() + .extension() + .and_then(|s| s.to_str()) + .unwrap_or_default(); + match ext { + "yaml" | "yml" => Figment::new().merge(Yaml::file(path)), + "json" => Figment::new().merge(Json::file(path)), + "toml" => Figment::new().merge(Toml::file(path)), + _ => panic!("Unsupported config file format: {ext}"), + } +} +pub fn init() { + let config_file = Env::var("CONFIG").unwrap_or("config.toml".into()); + let config_path = PathBuf::from(config_file); + if !config_path.exists() { + eprintln!("Config file not found at: {}", config_path.display()); + std::process::exit(1); + } + + let raw_config = figment_from_path(config_path); + let conf = match raw_config.extract::() { + Ok(s) => s, + Err(e) => { + eprintln!("It looks like your config is invalid. The following error occurred: {e}"); + std::process::exit(1); + } + }; + + CONFIG.set(conf).expect("config should be set"); +} +pub fn get() -> &'static Config { + CONFIG.get().unwrap() +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Config { + #[serde(default = "default_listen_addr")] + pub listen_addr: String, + + #[serde(default = "default_endpoint")] + pub endpoint: Option, + + pub name: String, + pub version: String, + + pub mcp_tools: Vec, +} +fn default_listen_addr() -> String { + "0.0.0.0:8008".to_owned() +} +fn default_endpoint() -> Option { + Some("mcp".to_owned()) +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct McpToolConfig { + /// Unique identifier for the tool + pub name: String, + /// Optional human-readable name of the tool for display purposes + #[serde(skip_serializing_if = "Option::is_none")] + pub title: Option, + /// Human-readable description of functionality + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + /// A JSON Schema object defining the expected parameters for the tool + pub input_schema: InputSchema, + #[serde(skip_serializing_if = "Option::is_none")] + /// Optional properties describing tool behavior + pub annotations: Option, + + pub output: String, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum InputSchema { + Object(JsonObject), + FilePath(String), +} + +impl InputSchema { + pub fn schema(&self) -> JsonObject { + match self { + InputSchema::Object(obj) => obj.clone(), + InputSchema::FilePath(path) => figment_from_path(path) + .extract::() + .expect("should read input schema from file"), + } + } +} diff --git a/node-hub/dora-mcp-server/src/error.rs b/node-hub/dora-mcp-server/src/error.rs new file mode 100644 index 00000000..350a26b9 --- /dev/null +++ b/node-hub/dora-mcp-server/src/error.rs @@ -0,0 +1,57 @@ +use salvo::async_trait; +use salvo::http::{StatusCode, StatusError}; +use salvo::prelude::{Depot, Request, Response, Writer}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum AppError { + #[error("public: `{0}`")] + Public(String), + #[error("internal: `{0}`")] + Internal(String), + #[error("salvo internal error: `{0}`")] + Salvo(#[from] ::salvo::Error), + #[error("serde json: `{0}`")] + SerdeJson(#[from] serde_json::error::Error), + #[error("http: `{0}`")] + StatusError(#[from] salvo::http::StatusError), + #[error("http parse: `{0}`")] + HttpParse(#[from] salvo::http::ParseError), + #[error("recv: `{0}`")] + Recv(#[from] tokio::sync::oneshot::error::RecvError), + #[error("canceled: `{0}`")] + Canceled(#[from] futures::channel::oneshot::Canceled), + #[error("error report: `{0}`")] + ErrReport(#[from] eyre::Report), + // #[error("reqwest: `{0}`")] + // Reqwest(#[from] reqwest::Error), +} + +impl AppError { + pub fn public>(msg: S) -> Self { + Self::Public(msg.into()) + } + + pub fn internal>(msg: S) -> Self { + Self::Internal(msg.into()) + } +} + +#[async_trait] +impl Writer for AppError { + async fn write(mut self, _req: &mut Request, _depot: &mut Depot, res: &mut Response) { + let code = match &self { + AppError::StatusError(e) => e.code, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + res.status_code(code); + let data = match self { + AppError::Salvo(e) => StatusError::internal_server_error().brief(e.to_string()), + AppError::Public(msg) => StatusError::internal_server_error().brief(msg), + AppError::Internal(_msg) => StatusError::internal_server_error(), + AppError::StatusError(e) => e, + e => StatusError::internal_server_error().brief(e.to_string()), + }; + res.render(data); + } +} diff --git a/node-hub/dora-mcp-server/src/main.rs b/node-hub/dora-mcp-server/src/main.rs new file mode 100644 index 00000000..883f1217 --- /dev/null +++ b/node-hub/dora-mcp-server/src/main.rs @@ -0,0 +1,176 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use dora_node_api::{ + arrow::array::{AsArray, StringArray}, + dora_core::config::DataId, + merged::{MergeExternalSend, MergedEvent}, + DoraNode, Event, MetadataParameters, Parameter, +}; + +use eyre::{Context, ContextCompat}; +use futures::channel::oneshot; +use rmcp::model::{ClientRequest, JsonRpcRequest}; +use salvo::cors::*; +use salvo::prelude::*; +use tokio::sync::mpsc; + +mod mcp_server; +use mcp_server::McpServer; +mod error; +mod routing; +use error::AppError; +mod config; +mod utils; +use config::Config; +use utils::gen_call_id; + +pub type AppResult = Result; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + config::init(); + + let (server_events_tx, server_events_rx) = mpsc::channel(3); + let server_events = tokio_stream::wrappers::ReceiverStream::new(server_events_rx); + + let mut reply_channels: HashMap> = HashMap::new(); + + let config = config::get(); + let mcp_server = Arc::new(McpServer::new(config)); + + salvo::http::request::set_global_secure_max_size(8_000_000); // set max size to 8MB + let acceptor = TcpListener::new(&config.listen_addr).bind().await; + tokio::spawn({ + let server_events_tx = server_events_tx.clone(); + let mcp_server = mcp_server.clone(); + async move { + let service = Service::new(routing::root( + config.endpoint.clone(), + mcp_server, + server_events_tx.clone(), + )) + .hoop( + Cors::new() + .allow_origin(AllowOrigin::any()) + .allow_methods(AllowMethods::any()) + .allow_headers(AllowHeaders::any()) + .into_handler(), + ); + Server::new(acceptor).serve(service).await; + if let Err(err) = server_events_tx.send(ServerEvent::Result(Ok(()))).await { + tracing::warn!("server result channel closed: {err}"); + } + } + }); + + let (mut node, events) = DoraNode::init_from_env()?; + let merged = events.merge_external_send(server_events); + let events = futures::executor::block_on_stream(merged); + + for event in events { + match event { + MergedEvent::External(event) => match event { + ServerEvent::Result(server_result) => { + server_result.context("server failed")?; + break; + } + ServerEvent::CallNode { + output, + data, + reply, + } => { + let mut metadata = MetadataParameters::default(); + let call_id = gen_call_id(); + metadata.insert("__dora_call_id".into(), Parameter::String(call_id.clone())); + node.send_output( + DataId::from(output.clone()), + metadata, + StringArray::from(vec![data]), + ) + .context("failed to send dora output")?; + + reply_channels.insert(call_id, reply); + } + }, + MergedEvent::Dora(event) => match event { + Event::Input { id, data, metadata } => { + match id.as_str() { + "request" => { + let data = data.as_string::().iter().fold( + "".to_string(), + |mut acc, s| { + if let Some(s) = s { + acc.push('\n'); + acc.push_str(s); + } + acc + }, + ); + + let request = + serde_json::from_str::>(&data) + .context("failed to parse call tool from string")?; + + if let Ok(result) = + mcp_server.handle_request(request, &server_events_tx).await + { + node.send_output( + DataId::from("response".to_owned()), + metadata.parameters, + StringArray::from( + vec![serde_json::to_string(&result).unwrap()], + ), + ) + .context("failed to send dora output")?; + } + } + _ => { + let Some(Parameter::String(call_id)) = + metadata.parameters.get("__dora_call_id") + else { + tracing::warn!("No call ID found in metadata for id: {}", id); + continue; + }; + let reply_channel = + reply_channels.remove(call_id).context("no reply channel")?; + let data = data.as_string::(); + let data = data.iter().fold("".to_string(), |mut acc, s| { + if let Some(s) = s { + acc.push('\n'); + acc.push_str(s); + } + acc + }); + if reply_channel.send(data).is_err() { + tracing::warn!("failed to send reply because channel closed early"); + } + // node.send_output(DataId::from("response".to_owned()), metadata, data) + // .context("failed to send dora output")?; + } + }; + } + Event::Stop(_) => { + break; + } + Event::InputClosed { id, .. } => { + tracing::info!("Input channel closed for id: {}", id); + } + event => { + eyre::bail!("unexpected event: {:#?}", event) + } + }, + } + } + + Ok(()) +} + +enum ServerEvent { + Result(eyre::Result<()>), + CallNode { + output: String, + data: String, + reply: oneshot::Sender, + }, +} diff --git a/node-hub/dora-mcp-server/src/mcp_server.rs b/node-hub/dora-mcp-server/src/mcp_server.rs new file mode 100644 index 00000000..2a436648 --- /dev/null +++ b/node-hub/dora-mcp-server/src/mcp_server.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; + +use futures::channel::oneshot; +use rmcp::model::{ + CallToolRequest, CallToolResult, EmptyResult, Implementation, InitializeResult, + ListToolsResult, ProtocolVersion, ServerCapabilities, ServerResult, Tool, +}; +use rmcp::model::{ClientRequest, JsonRpcRequest}; +use serde::Deserialize; +use tokio::sync::mpsc; + +use crate::{Config, ServerEvent}; + +#[derive(Debug)] +pub struct McpServer { + tools: Vec, + server_info: Implementation, +} + +#[derive(Deserialize, Debug)] +pub struct McpTool { + pub output: String, + #[serde(flatten)] + pub inner: Tool, +} + +impl McpServer { + pub fn new(config: &Config) -> Self { + let mut tools = Vec::new(); + for tool_config in &config.mcp_tools { + let tool = Tool { + name: tool_config.name.clone().into(), + description: tool_config.description.clone().map(|s| s.into()), + input_schema: Arc::new(tool_config.input_schema.schema()), + annotations: tool_config.annotations.clone(), + }; + tools.push(McpTool { + inner: tool, + output: tool_config.output.clone(), + }); + } + Self { + tools, + server_info: Implementation { + name: config.name.clone(), + version: config.version.clone(), + }, + } + } + + // pub fn tools(&self) -> Vec<&Tool> { + // self.tools.iter().map(|t| &t.inner).collect() + // } + + // pub fn server_info(&self) -> &Implementation { + // &self.server_info + // } + + pub async fn handle_ping(&self) -> eyre::Result { + Ok(EmptyResult {}) + } + pub async fn handle_initialize(&self) -> eyre::Result { + Ok(InitializeResult { + protocol_version: ProtocolVersion::V_2025_03_26, + server_info: self.server_info.clone(), + capabilities: ServerCapabilities { + tools: Some(Default::default()), + ..Default::default() + }, + instructions: None, + }) + } + pub async fn handle_tools_list(&self) -> eyre::Result { + Ok(ListToolsResult { + tools: self.tools.iter().map(|t| t.inner.clone()).collect(), + next_cursor: None, + }) + } + + pub async fn handle_tools_call( + &self, + request: CallToolRequest, + request_tx: &mpsc::Sender, + ) -> eyre::Result { + let (tx, rx) = oneshot::channel(); + + let tool = self + .tools + .iter() + .find(|t| t.inner.name == request.params.name) + .ok_or_else(|| eyre::eyre!("Tool not found: {}", request.params.name))?; + request_tx + .send(ServerEvent::CallNode { + output: tool.output.clone(), + data: serde_json::to_string(&request.params).unwrap(), + reply: tx, + }) + .await?; + + let data: String = rx.await?; + serde_json::from_str(&data) + .map_err(|e| eyre::eyre!("Failed to parse call tool result: {e}")) + } + + pub async fn handle_request( + &self, + rpc_request: JsonRpcRequest, + server_events_tx: &mpsc::Sender, + ) -> eyre::Result { + match rpc_request.request { + ClientRequest::PingRequest(_request) => { + self.handle_ping().await.map(ServerResult::EmptyResult) + } + ClientRequest::InitializeRequest(_request) => self + .handle_initialize() + .await + .map(ServerResult::InitializeResult), + ClientRequest::ListToolsRequest(_request) => self + .handle_tools_list() + .await + .map(ServerResult::ListToolsResult), + ClientRequest::CallToolRequest(request) => self + .handle_tools_call(request, server_events_tx) + .await + .map(ServerResult::CallToolResult), + method => Err(eyre::eyre!("unexpected method: {:#?}", method)), + } + } +} diff --git a/node-hub/dora-mcp-server/src/routing.rs b/node-hub/dora-mcp-server/src/routing.rs new file mode 100644 index 00000000..8331eb7d --- /dev/null +++ b/node-hub/dora-mcp-server/src/routing.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use eyre::Context; +use rmcp::model::{ClientJsonRpcMessage, JsonRpcResponse, JsonRpcVersion2_0}; +use salvo::prelude::*; +use tokio::sync::mpsc; + +use crate::{AppResult, McpServer, ServerEvent}; + +pub fn root( + endpoint: Option, + mcp_server: Arc, + server_events_tx: mpsc::Sender, +) -> Router { + Router::with_hoop(affix_state::inject(mcp_server).inject(server_events_tx)).push( + if let Some(endpoint) = endpoint { + Router::with_path(endpoint) + } else { + Router::new() + } + .post(handle_post) + .delete(handle_delete), + ) +} + +#[handler] +async fn handle_delete(res: &mut Response) { + res.render(Text::Plain("DELETE method is not supported")); +} + +#[handler] +async fn handle_post(req: &mut Request, depot: &mut Depot, res: &mut Response) -> AppResult<()> { + tracing::debug!("Handling the coming chat completion request."); + let server_events_tx = depot + .obtain::>() + .expect("server_events_tx must be exists"); + let mcp_server = depot + .obtain::>() + .expect("mcp server must be exists"); + + tracing::debug!("Prepare the chat completion request."); + + let rpc_message = serde_json::from_slice::(req.payload().await?) + .context("failed to parse request bodyxxx")?; + match rpc_message { + ClientJsonRpcMessage::Request(rpc_request) => { + let response = JsonRpcResponse { + jsonrpc: JsonRpcVersion2_0, + id: rpc_request.id.clone(), + result: mcp_server + .handle_request(rpc_request, server_events_tx) + .await + .unwrap(), + }; + res.render(Json(response)); + } + ClientJsonRpcMessage::Notification(_) + | ClientJsonRpcMessage::Response(_) + | ClientJsonRpcMessage::Error(_) => { + res.render(StatusCode::ACCEPTED); + } + _ => { + res.render( + StatusError::not_implemented().brief("Batch requests are not supported yet"), + ); + } + } + + tracing::debug!("Send the chat completion response."); + Ok(()) +} diff --git a/node-hub/dora-mcp-server/src/utils.rs b/node-hub/dora-mcp-server/src/utils.rs new file mode 100644 index 00000000..4f98cdb0 --- /dev/null +++ b/node-hub/dora-mcp-server/src/utils.rs @@ -0,0 +1,3 @@ +pub(crate) fn gen_call_id() -> String { + format!("call-{}", uuid::Uuid::new_v4()) +}