1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13 actor::Stop,
14 anyhow::{self, Context as _Context},
15 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16 subkey_builder,
17};
18use lyquor_db::{MemDB, RocksDB};
19use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
20use lyquor_lib::{api, lyquid, utils};
21use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
22use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
23use lyquor_vm::log::Log;
24
25subkey_builder!(RootSubKey(
26 ([0x01])-lyquid() => Key,
27 ([0x02])-fco() => Key,
28 ([0x03])-log() => Key,
29 ([0x04])-event_store() => Key,
30));
31
32fn subkey() -> &'static RootSubKey {
33 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
34 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
35}
36
37const LYQUOR_DB_ROOT: &str = "./lyquor_db";
38const ANVIL_STATE_FILENAME: &str = "anvil.state";
39
40static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
41
42#[derive(Debug, Message)]
43#[rtype(result = "()")]
44struct LoadLyquid {
45 id: LyquidID,
46 deps: Vec<LyquidID>,
47}
48
49#[derive(Debug, Message)]
50#[rtype(result = "()")]
51struct SetPool {
52 pool: Addr<lyquid::LyquidPool>,
53}
54
55struct LyquidDiscovery {
56 pool: Option<Addr<lyquid::LyquidPool>>,
57 register: Arc<Notify>,
58 bartender_id: LyquidID,
59 bartender_addr: Address,
60}
61
62lyquor_primitives::debug_struct_name!(LyquidDiscovery);
63
64impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
65 type Result = ResponseFuture<Address>;
66 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
67 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
68 let lyquor_seq::eth::GetContract { id, idx } = msg;
69 if id == self.bartender_id && idx == Some(0) {
70 return Box::pin(std::future::ready(self.bartender_addr));
71 }
72 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
73 let reg = self.register.clone();
74 Box::pin(async move {
75 loop {
76 if let Some(n) = idx {
80 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
81 Ok(Some(info)) => return info.contract,
82 Ok(None) => reg.notified().await,
83 Err(e) => {
84 tracing::error!("Failed to get lyquid deployment info: {}", e);
85 std::future::pending::<()>().await;
87 }
88 }
89 } else {
90 match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
91 Ok(Some(info)) => return info.contract,
92 Ok(None) => reg.notified().await,
93 Err(e) => {
94 tracing::error!("Failed to get latest lyquid info: {}", e);
95 std::future::pending::<()>().await;
96 }
97 }
98 }
99 }
100 })
101 }
102}
103
104impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
107 type Result = ();
108
109 #[tracing::instrument(level = "trace", skip(msg))]
110 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
111 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
112 if let Some(event) = event {
113 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
114 }
115 }
116}
117
118impl Handler<LoadLyquid> for LyquidDiscovery {
119 type Result = ResponseFuture<()>;
120
121 #[tracing::instrument(level = "trace")]
122 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
123 Box::pin(self.load(msg.id, msg.deps))
124 }
125}
126
127impl Handler<SetPool> for LyquidDiscovery {
128 type Result = ();
129
130 #[tracing::instrument(level = "trace")]
131 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
132 if self.pool.is_some() {
133 return
134 }
135 let pool = msg.pool;
136 let me = ctx.address();
137 self.pool = Some(pool.clone());
138 ctx.spawn(
139 async move {
140 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
143 None => {
144 tracing::error!("Failed to obtain the registered Lyquid list.");
149 return
150 }
151 Some(lyquids_with_deps) => lyquids_with_deps,
152 };
153 for (id, deps) in lyquids_with_deps.into_iter() {
155 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
156 tracing::error!("Failed to send LoadLyquid message: {}", e);
157 }
158 }
159 }
160 .into_actor(self),
161 );
162 }
163}
164
165impl LyquidDiscovery {
166 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
167 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
168 let register = self.register.clone();
169 async move {
170 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
171 Ok(result) => match result {
172 Ok(already_exists) => {
173 if !already_exists {
174 tracing::info!("Discovered {}", id);
175 }
176 }
177 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
178 },
179 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
180 }
181 register.notify_waiters();
182 }
183 }
184}
185
186impl Handler<Stop> for LyquidDiscovery {
187 type Result = ();
188
189 #[tracing::instrument(level = "trace")]
190 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
191 ctx.stop();
192 }
193}
194
195impl Actor for LyquidDiscovery {
196 type Context = actix::Context<Self>;
197
198 #[tracing::instrument(level = "trace", skip(_ctx))]
199 fn started(&mut self, _ctx: &mut Self::Context) {}
200
201 #[tracing::instrument(level = "trace", skip(_ctx))]
202 fn stopped(&mut self, _ctx: &mut Self::Context) {}
203}
204
205struct DBFlusher {
206 last_bn: u64,
207 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
208 fco: Addr<FCO>,
209 anvil_chain: Option<ClientHandle>,
210 flush_interval_secs: u64,
211}
212
213lyquor_primitives::debug_struct_name!(DBFlusher);
214
215impl DBFlusher {
216 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
217 let anvil_chain = self.anvil_chain.clone();
218 let fco = self.fco.clone();
219 async move {
220 fco.send(lyquor_seq::fco::Commit)
221 .await
222 .map_err(anyhow::Error::from)
223 .and_then(|e| e.map_err(anyhow::Error::from))?;
224
225 if let Some(client) = &anvil_chain {
226 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
234 let dump = client
235 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
236 .await
237 .map(|r| r.0);
238
239 match dump {
240 Ok(state) => {
241 let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
243 std::fs::create_dir_all(&devnet_path)?;
244 std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
245 }
246 Err(_) => {
247 tracing::error!(
248 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
249 );
250 }
251 }
252 }
253
254 Ok(())
255 }
256 }
257
258 fn flush(&mut self, res: anyhow::Result<()>) {
259 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
260 tracing::error!("Failed to write: {e}");
261 } else {
262 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
263 }
264 }
265}
266
267#[derive(Message, Debug)]
268#[rtype(result = "()")]
269struct FlushDB;
270
271impl Handler<FlushDB> for DBFlusher {
272 type Result = ResponseActFuture<Self, ()>;
273
274 #[tracing::instrument(level = "trace", skip(self, _ctx))]
275 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
276 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
277 actor.flush(res);
278 }))
279 }
280}
281
282impl Handler<Stop> for DBFlusher {
283 type Result = ResponseActFuture<Self, ()>;
284
285 #[tracing::instrument(level = "trace")]
286 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
287 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
288 actor.flush(res);
289 }))
290 }
291}
292
293impl Actor for DBFlusher {
294 type Context = actix::Context<Self>;
295
296 #[tracing::instrument(level = "trace", skip_all)]
297 fn started(&mut self, ctx: &mut Self::Context) {
298 if self.flush_interval_secs > 0 {
299 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
300 ctx.run_interval(periodic_write, |_act, ctx| {
301 ctx.address().do_send(FlushDB);
302 });
303 }
304 }
305
306 #[tracing::instrument(level = "trace", skip_all)]
307 fn stopped(&mut self, _ctx: &mut Self::Context) {}
308}
309
310async fn setup_eth_backend(
311 profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
312) -> anyhow::Result<eth::Backend> {
313 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
314 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
315
316 Ok(eth::Backend::new(
317 eth::Config::builder()
318 .finality_tag(finality_tag)
319 .registry(reg)
320 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
321 .build(),
322 jsonrpc_client,
323 ))
324}
325
326#[derive(Default)]
327struct Subsystems {
328 fco: Option<Addr<FCO>>,
329 log: Option<Addr<Log>>,
330 pool: Option<Addr<lyquid::LyquidPool>>,
331 api: Option<Addr<api::EthAPIServer>>,
332 db: Option<Addr<DBFlusher>>,
333 dis: Option<Addr<LyquidDiscovery>>,
334 client: Option<ClientHandle>,
335 eth_backend: Option<lyquor_seq::eth::Backend>,
336}
337debug_struct_name!(Subsystems);
338
339impl Subsystems {
340 #[tracing::instrument(level = "trace")]
341 async fn stop(&mut self) {
342 if let Some(api) = self.api.take() {
343 api.send(Stop).await.ok();
344 }
345 if let Some(eth_backend) = self.eth_backend.take() {
346 eth_backend.stop().await;
347 }
348 if let Some(log) = self.log.take() {
349 log.send(Stop).await.ok();
350 }
351 if let Some(dis) = self.dis.take() {
352 dis.send(Stop).await.ok();
353 }
354 if let Some(pool) = self.pool.take() {
355 pool.send(Stop).await.ok();
356 }
357 if let Some(db) = self.db.take() {
358 db.send(Stop).await.ok();
359 }
360 if let Some(dis) = self.dis.take() {
361 dis.send(Stop).await.ok();
362 }
363 if let Some(fco) = self.fco.take() {
364 fco.send(Stop).await.ok();
365 }
366 }
367}
368
369struct NodeConfig {
370 mem_db: bool,
371 seq_eth_url: String,
372 api_url: String,
373 profile: utils::LyquorProfile,
374 started_anvil: bool,
375 flush_interval_secs: u64,
376}
377
378struct Node {
380 sys: Subsystems,
381 config: Arc<NodeConfig>,
382 network: Option<lyquor_net::hub::Hub>,
383}
384
385lyquor_primitives::debug_struct_name!(Node);
386
387impl Handler<Stop> for Node {
388 type Result = AtomicResponse<Self, ()>;
389
390 #[tracing::instrument(level = "trace")]
391 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
392 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
393 AtomicResponse::new(Box::pin(
394 async move {
395 sys.stop().await;
396 }
397 .into_actor(self)
398 .map(|_, _act, ctx| {
399 ctx.stop();
400 }),
401 ))
402 }
403}
404
405impl Actor for Node {
406 type Context = Context<Self>;
407
408 #[tracing::instrument(level = "trace", skip(ctx))]
409 fn started(&mut self, ctx: &mut Self::Context) {
410 ctx.spawn(
412 Self::init(
413 self.config.clone(),
414 self.network.take().expect("Node: Network not found."),
415 )
416 .into_actor(self)
417 .map(|sys, act, ctx| {
418 match sys {
419 Ok(sys) => {
420 act.sys = sys;
421 }
422 Err(e) => {
423 tracing::error!("Failed to initialize node: {e:?}");
424 ctx.stop();
425 }
426 };
427 }),
428 );
429 }
430
431 #[tracing::instrument(level = "trace", skip(_ctx))]
432 fn stopped(&mut self, _ctx: &mut Self::Context) {}
433}
434
435impl Node {
436 #[tracing::instrument(level = "trace", skip_all)]
437 async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
438 let mut sys = Subsystems::default();
439
440 let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
442 let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
443 tracing::warn!("using MemDB, all data will be lost upon exit");
444 store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
445 Arc::new(utils::LVMMemStore)
446 } else {
447 store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
448 &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
449 )?)));
450 Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
451 store.clone(),
452 subkey().lyquid(),
453 ))))
454 };
455 let dis = LyquidDiscovery {
460 pool: None,
461 register: Arc::new(Notify::new()),
462 bartender_id: *config.profile.bartender_id(),
463 bartender_addr: Address::from_str(config.profile.bartender_addr())
464 .context("Invalid bartender address in profile.")?
465 .into(),
466 }
467 .start();
468 sys.dis = Some(dis.clone());
469
470 let jsonrpc_client = ClientConfig::builder()
472 .url(config.seq_eth_url.clone().parse()?)
473 .build()
474 .into_client();
475 sys.client = Some(jsonrpc_client.clone());
476
477 let anvil_chain = if config.started_anvil && !config.mem_db {
479 let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
480 if std::path::Path::new(&state_file).exists() {
481 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
482 let state_data = std::fs::read(&state_file)?;
483 let load_result = jsonrpc_client
484 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
485 .await?;
486 tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
487 } else {
488 tracing::info!(
489 "No existing anvil state file found at {}, starting with fresh state.",
490 state_file
491 );
492 }
493 Some(jsonrpc_client.clone())
494 } else {
495 None
496 };
497
498 let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
499
500 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
501 let fco = FCO::new(
502 eth_backend.clone(),
503 Arc::new(fco_store),
504 Some(config.profile.init_chain_position()),
505 )?
506 .start();
507 sys.fco = Some(fco.clone());
508 sys.eth_backend = Some(eth_backend);
510 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
515 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
516 sys.log = Some(log.clone());
517
518 let lvm = lyquid::LVM::new(
520 log.clone(),
521 network,
522 10,
523 Some(lyquor_vm::InterOps {
524 inter: fco.clone().recipient(),
525 submit: fco.clone().recipient(),
526 }),
527 );
528 let api_subs = api::Subscriptions::new().start();
529 let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
530 Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
531 1000,
532 ));
533 let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
537 let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
538 let bartender_id = *config.profile.bartender_id();
539 let pool = lyquid::LyquidPool::new(
540 bartender_id,
541 lvm,
542 lyquid::LyquidPoolSetup {
543 fco: fco.clone(),
544 store_factory: lyq_store,
545 image_repo,
546 event_store,
547 api_subs: api_subs.clone(),
548 config: lyquid::LyquidConfig::builder().build(),
549 },
550 )
551 .await?
552 .start();
553 sys.pool = Some(pool.clone());
554 let bartender_setup = {
558 let register_topic = LyteLog::tagged_value_topic("Register");
559 match log
560 .send(lyquor_vm::log::GetNumberOfRecords {
561 id: bartender_id,
562 topic: register_topic,
563 })
564 .await
565 {
566 Ok(count) if count > 0 => None,
567 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
568 id: bartender_id,
569 topic: register_topic,
570 })),
571 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
572 }
573 };
574
575 let api = lyquor_lib::api::EthAPIServer::new(
577 &config.api_url,
578 jsonrpc_client.clone(),
579 api_subs,
580 fco.clone(),
581 pool.clone(),
582 )
583 .await?;
584 sys.api = Some(api.start());
585
586 if let Some(reg) = bartender_setup {
588 tracing::info!("Waiting for bartender to be registered.");
589 reg.await.context("Failed to wait for bartender registration.")?;
590 }
591 let bartender_ins = pool
593 .send(lyquid::GetBartender)
594 .await
595 .expect("Failed to obtain bartender.");
596 let bartender_ln = bartender_ins
597 .instance()
598 .read()
599 .max_number()
600 .unwrap_or(LyquidNumber::ZERO);
601 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
602 bartender_ins
603 .wait_until(bartender_ln)
604 .await
605 .context("Failed to wait for the initial re-execution of bartender.")?;
606
607 log.send(lyquor_vm::log::Subscribe {
609 id: bartender_id,
610 topic: LyteLog::tagged_value_topic("Register"),
611 subscriber: dis.clone().recipient(),
612 })
613 .await
614 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
615 dis.send(SetPool { pool: pool.clone() })
616 .await
617 .context("Failed to set pool in LyquidDiscovery.")?;
618
619 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
620
621 let db = DBFlusher {
623 last_bn: 0,
624 store,
625 fco,
626 anvil_chain,
627 flush_interval_secs: config.flush_interval_secs,
628 }
629 .start();
630 sys.db = Some(db);
631
632 Ok(sys)
633 }
634}
635
636fn generate_unused_port() -> anyhow::Result<u16> {
637 let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
638 let port = listener.local_addr().context("Failed to get local address.")?.port();
639 Ok(port)
640}
641
642fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
643 let anvil_port = generate_unused_port()?;
644 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
647 let mut sigset = SigSet::empty();
648 sigset.add(Signal::SIGINT);
649 let mut old_mask = SigSet::empty();
650 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
651 .context("Failed to mask SIGINT for anvil.")?;
652
653 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
654
655 let stdout_reader = std::io::BufReader::new(
656 anvil
657 .child_mut()
658 .stdout
659 .take()
660 .context("Failed to read from anvil stdout.")?,
661 );
662
663 tokio::task::spawn_blocking(move || {
664 use std::io::BufRead;
665 let mut stdout_lines = stdout_reader.lines();
666 while let Some(line) = stdout_lines.next() {
667 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
668 }
669 });
670
671 tracing::info!("Anvil started at port {}.", anvil_port);
672 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
673 .context("Failed to restore old signal mask for anvil.")?;
674
675 Ok(anvil)
676}
677
678#[actix::main]
679async fn main() -> anyhow::Result<()> {
680 std::panic::set_hook(Box::new(|_| {
682 if let Some(&pid) = ANVIL_PID.get() {
683 tracing::warn!("Panic detected, killing anvil process {}", pid);
684 use nix::sys::signal::{Signal, kill};
685 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
686 }
687 }));
688
689 lyquor_cli::setup_tracing()?;
690
691 println!("{}", lyquor_cli::format_logo_banner());
692
693 let matches = clap::command!()
694 .propagate_version(true)
695 .arg(
696 clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
697 .required(false)
698 .action(clap::ArgAction::SetTrue),
699 )
700 .arg(
701 clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
702 .required(false)
703 .default_value("localhost:10087"),
704 )
705 .arg(
706 clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
707 .required(false)
708 .default_value("localhost:10080"),
709 )
710 .arg(
711 clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
712 .required(false)
713 .default_value("devnet"),
714 )
715 .arg(
716 clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
717 .required(false),
718 )
719 .arg(
720 clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
721 .required(false)
722 .value_parser(clap::value_parser!(u64)),
723 )
724 .get_matches();
725
726 let (_, tls_config) = lyquor_tls::generator::single_node_config();
728 let addr = matches.get_one::<String>("upc-addr").unwrap();
729 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
730 .listen_addr(addr.clone())
731 .build()
732 .context("Failed to setup network.")?;
733 network.start().await.context("Failed to start network.")?;
734
735 let mut _anvil: Option<AnvilInstance> = None;
736 let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
737 let profile: utils::LyquorProfile = match network_name {
738 Some("devnet") => {
739 let ws_url = match matches.get_one::<String>("force-seq-eth") {
740 None => {
741 let anvil = start_devnet_anvil()?;
742 ANVIL_PID.set(anvil.child().id()).ok();
744 let ep = anvil.ws_endpoint();
745 _anvil = Some(anvil);
746 ep
747 }
748 Some(url) => url.to_string(),
749 };
750
751 utils::Devnet::new(ws_url).into()
752 }
753 _ => return Err(anyhow::anyhow!("Network is not supported.")),
754 };
755 let seq_eth_url = matches
756 .get_one::<String>("force-seq-eth")
757 .cloned()
758 .unwrap_or_else(|| profile.sequencer().to_string());
759 let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
760 let mem_db = matches.get_flag("mem-db");
761 let flush_interval_secs = matches
762 .get_one::<u64>("flush-interval")
763 .copied()
764 .unwrap_or_else(|| match network_name {
765 Some("devnet") => 0,
766 _ => 10,
767 });
768
769 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
772 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
773
774 let node = Node {
775 sys: Subsystems::default(),
776 config: Arc::new(NodeConfig {
777 mem_db,
778 seq_eth_url,
779 api_url,
780 profile,
781 started_anvil: _anvil.is_some(),
782 flush_interval_secs,
783 }),
784 network: Some(network),
785 }
786 .start();
787
788 tokio::select! {
789 _ = sigint.recv() => {
790 tracing::info!("received SIGINT");
791 }
792 _ = sigterm.recv() => {
793 tracing::info!("received SIGTERM");
794 }
795 }
796
797 Ok(node.send(Stop).await?)
798}