1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio::sync::Notify;
12
13use lyquor_api::{
14 actor::Stop,
15 anyhow::{self, Context as _Context},
16 config::{DB, NodeConfig},
17 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
18 profile::{Devnet, LyquorProfile, NetworkType},
19 subkey_builder,
20};
21use lyquor_db::{MemDB, RocksDB};
22use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
23use lyquor_lib::{api, lyquid, utils};
24use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
25use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
26use lyquor_vm::log::Log;
27
28subkey_builder!(RootSubKey(
29 ([0x01])-lyquid() => Key,
30 ([0x02])-fco() => Key,
31 ([0x03])-log() => Key,
32 ([0x04])-event_store() => Key,
33));
34
35fn subkey() -> &'static RootSubKey {
36 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
37 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
38}
39
40const ANVIL_STATE_FILENAME: &str = "anvil.state";
41
42static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
43
44#[derive(Debug, Message)]
45#[rtype(result = "()")]
46struct LoadLyquid {
47 id: LyquidID,
48 deps: Vec<LyquidID>,
49}
50
51#[derive(Debug, Message)]
52#[rtype(result = "()")]
53struct SetPool {
54 pool: Addr<lyquid::LyquidPool>,
55}
56
57struct LyquidDiscovery {
58 pool: Option<Addr<lyquid::LyquidPool>>,
59 register: Arc<Notify>,
60 bartender_id: LyquidID,
61 bartender_addr: Address,
62}
63
64lyquor_primitives::debug_struct_name!(LyquidDiscovery);
65
66impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
67 type Result = ResponseFuture<Address>;
68 #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
69 fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
70 let lyquor_seq::eth::GetContract { id, idx } = msg;
71 if id == self.bartender_id && idx == Some(0) {
72 return Box::pin(std::future::ready(self.bartender_addr));
73 }
74 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
75 let reg = self.register.clone();
76 Box::pin(async move {
77 loop {
78 if let Some(n) = idx {
82 match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
83 Ok(Some(info)) => return info.contract,
84 Ok(None) => reg.notified().await,
85 Err(e) => {
86 tracing::error!("Failed to get lyquid deployment info: {}", e);
87 std::future::pending::<()>().await;
89 }
90 }
91 } else {
92 match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
93 Ok(Some(info)) => return info.contract,
94 Ok(None) => reg.notified().await,
95 Err(e) => {
96 tracing::error!("Failed to get latest lyquid info: {}", e);
97 std::future::pending::<()>().await;
98 }
99 }
100 }
101 }
102 })
103 }
104}
105
106impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
109 type Result = ();
110
111 #[tracing::instrument(level = "trace", skip(msg))]
112 fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
113 let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
114 if let Some(event) = event {
115 ctx.spawn(self.load(event.id, event.deps).into_actor(self));
116 }
117 }
118}
119
120impl Handler<LoadLyquid> for LyquidDiscovery {
121 type Result = ResponseFuture<()>;
122
123 #[tracing::instrument(level = "trace")]
124 fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
125 Box::pin(self.load(msg.id, msg.deps))
126 }
127}
128
129impl Handler<SetPool> for LyquidDiscovery {
130 type Result = ();
131
132 #[tracing::instrument(level = "trace")]
133 fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
134 if self.pool.is_some() {
135 return;
136 }
137 let pool = msg.pool;
138 let me = ctx.address();
139 self.pool = Some(pool.clone());
140 ctx.spawn(
141 async move {
142 let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
145 None => {
146 tracing::error!("Failed to obtain the registered Lyquid list.");
151 return;
152 }
153 Some(lyquids_with_deps) => lyquids_with_deps,
154 };
155 for (id, deps) in lyquids_with_deps.into_iter() {
157 if let Err(e) = me.send(LoadLyquid { id, deps }).await {
158 tracing::error!("Failed to send LoadLyquid message: {}", e);
159 }
160 }
161 }
162 .into_actor(self),
163 );
164 }
165}
166
167impl LyquidDiscovery {
168 fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
169 let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
170 let register = self.register.clone();
171 async move {
172 match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
173 Ok(result) => match result {
174 Ok(already_exists) => {
175 if !already_exists {
176 tracing::info!("Discovered {}", id);
177 }
178 }
179 Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
180 },
181 Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
182 }
183 register.notify_waiters();
184 }
185 }
186}
187
188impl Handler<Stop> for LyquidDiscovery {
189 type Result = ();
190
191 #[tracing::instrument(level = "trace")]
192 fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
193 ctx.stop();
194 }
195}
196
197impl Actor for LyquidDiscovery {
198 type Context = actix::Context<Self>;
199
200 #[tracing::instrument(level = "trace", skip(_ctx))]
201 fn started(&mut self, _ctx: &mut Self::Context) {}
202
203 #[tracing::instrument(level = "trace", skip(_ctx))]
204 fn stopped(&mut self, _ctx: &mut Self::Context) {}
205}
206
207struct DBFlusher {
208 last_bn: u64,
209 db_dir: PathBuf,
210 store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
211 fco: Addr<FCO>,
212 anvil_chain: Option<ClientHandle>,
213 flush_interval_secs: u64,
214}
215
216lyquor_primitives::debug_struct_name!(DBFlusher);
217
218impl DBFlusher {
219 fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
220 let anvil_chain = self.anvil_chain.clone();
221 let fco = self.fco.clone();
222 let db_dir = self.db_dir.clone();
223 async move {
224 fco.send(lyquor_seq::fco::Commit)
225 .await
226 .map_err(anyhow::Error::from)
227 .and_then(|e| e.map_err(anyhow::Error::from))?;
228
229 if let Some(client) = &anvil_chain {
230 use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
238 let dump = client
239 .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
240 .await
241 .map(|r| r.0);
242
243 match dump {
244 Ok(state) => {
245 let devnet_path = db_dir.join("devnet");
247 std::fs::create_dir_all(&devnet_path)?;
248 std::fs::write(devnet_path.join(ANVIL_STATE_FILENAME), &state)?;
249 }
250 Err(_) => {
251 tracing::error!(
252 "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
253 );
254 }
255 }
256 }
257
258 Ok(())
259 }
260 }
261
262 fn flush(&mut self, res: anyhow::Result<()>) {
263 if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
264 tracing::error!("Failed to write: {e}");
265 } else {
266 tracing::info!("Written to the storage backend (block={}).", self.last_bn);
267 }
268 }
269}
270
271#[derive(Message, Debug)]
272#[rtype(result = "()")]
273struct FlushDB;
274
275impl Handler<FlushDB> for DBFlusher {
276 type Result = ResponseActFuture<Self, ()>;
277
278 #[tracing::instrument(level = "trace", skip(self, _ctx))]
279 fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
280 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
281 actor.flush(res);
282 }))
283 }
284}
285
286impl Handler<Stop> for DBFlusher {
287 type Result = ResponseActFuture<Self, ()>;
288
289 #[tracing::instrument(level = "trace")]
290 fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
291 Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
292 actor.flush(res);
293 }))
294 }
295}
296
297impl Actor for DBFlusher {
298 type Context = actix::Context<Self>;
299
300 #[tracing::instrument(level = "trace", skip_all)]
301 fn started(&mut self, ctx: &mut Self::Context) {
302 if self.flush_interval_secs > 0 {
303 let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
304 ctx.run_interval(periodic_write, |_act, ctx| {
305 ctx.address().do_send(FlushDB);
306 });
307 }
308 }
309
310 #[tracing::instrument(level = "trace", skip_all)]
311 fn stopped(&mut self, _ctx: &mut Self::Context) {}
312}
313
314async fn setup_eth_backend(
315 profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
316) -> anyhow::Result<eth::Backend> {
317 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
318 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
319
320 Ok(eth::Backend::new(
321 eth::Config::builder()
322 .finality_tag(finality_tag)
323 .registry(reg)
324 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
325 .build(),
326 jsonrpc_client,
327 ))
328}
329
330#[derive(Default)]
331struct Subsystems {
332 fco: Option<Addr<FCO>>,
333 log: Option<Addr<Log>>,
334 pool: Option<Addr<lyquid::LyquidPool>>,
335 api: Option<Addr<api::EthAPIServer>>,
336 db: Option<Addr<DBFlusher>>,
337 dis: Option<Addr<LyquidDiscovery>>,
338 client: Option<ClientHandle>,
339 eth_backend: Option<lyquor_seq::eth::Backend>,
340}
341debug_struct_name!(Subsystems);
342
343impl Subsystems {
344 #[tracing::instrument(level = "trace")]
345 async fn stop(&mut self) {
346 if let Some(api) = self.api.take() {
347 api.send(Stop).await.ok();
348 }
349 if let Some(eth_backend) = self.eth_backend.take() {
350 eth_backend.stop().await;
351 }
352 if let Some(log) = self.log.take() {
353 log.send(Stop).await.ok();
354 }
355 if let Some(dis) = self.dis.take() {
356 dis.send(Stop).await.ok();
357 }
358 if let Some(pool) = self.pool.take() {
359 pool.send(Stop).await.ok();
360 }
361 if let Some(db) = self.db.take() {
362 db.send(Stop).await.ok();
363 }
364 if let Some(dis) = self.dis.take() {
365 dis.send(Stop).await.ok();
366 }
367 if let Some(fco) = self.fco.take() {
368 fco.send(Stop).await.ok();
369 }
370 }
371}
372
373struct Node {
375 sys: Subsystems,
376 config: Arc<NodeConfig>,
377 profile: Arc<LyquorProfile>,
378 seq_eth_url: String,
379 started_anvil: bool,
380 network: Option<lyquor_net::hub::Hub>,
381}
382
383lyquor_primitives::debug_struct_name!(Node);
384
385impl Handler<Stop> for Node {
386 type Result = AtomicResponse<Self, ()>;
387
388 #[tracing::instrument(level = "trace")]
389 fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
390 let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
391 AtomicResponse::new(Box::pin(
392 async move {
393 sys.stop().await;
394 }
395 .into_actor(self)
396 .map(|_, _act, ctx| {
397 ctx.stop();
398 }),
399 ))
400 }
401}
402
403impl Actor for Node {
404 type Context = Context<Self>;
405
406 #[tracing::instrument(level = "trace", skip(ctx))]
407 fn started(&mut self, ctx: &mut Self::Context) {
408 ctx.spawn(
410 Self::init(
411 self.config.clone(),
412 self.profile.clone(),
413 self.seq_eth_url.clone(),
414 self.started_anvil,
415 self.network.take().expect("Node: Network not found."),
416 )
417 .into_actor(self)
418 .map(|sys, act, ctx| {
419 match sys {
420 Ok(sys) => {
421 act.sys = sys;
422 }
423 Err(e) => {
424 tracing::error!("Failed to initialize node: {e:?}");
425 ctx.stop();
426 }
427 };
428 }),
429 );
430 }
431
432 #[tracing::instrument(level = "trace", skip(_ctx))]
433 fn stopped(&mut self, _ctx: &mut Self::Context) {}
434}
435
436impl Node {
437 #[tracing::instrument(level = "trace", skip_all)]
438 async fn init(
439 config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String, started_anvil: bool,
440 network: lyquor_net::hub::Hub,
441 ) -> anyhow::Result<Subsystems> {
442 let mut sys = Subsystems::default();
443
444 let jsonrpc_client = ClientConfig::builder().url(seq_eth_url.parse()?).build().into_client();
446 sys.client = Some(jsonrpc_client.clone());
447
448 let (store, lyq_store, anvil_chain): (
450 Arc<ShadowKVStore<Box<dyn KVStore>>>,
451 Arc<dyn utils::LVMStoreFactory>,
452 Option<ClientHandle>,
453 ) = match &config.storage.db {
454 DB::MemDb => {
455 tracing::warn!("using MemDB, all data will be lost upon exit");
456 let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
457 (store, Arc::new(utils::LVMMemStore), None)
458 }
459 DB::RocksDb { db_dir } => {
460 let base_dir = db_dir.clone();
461 let store = Arc::new(ShadowKVStore::new(
462 Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
463 ));
464 let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
465 store.clone(),
466 subkey().lyquid(),
467 ))));
468
469 let anvil_chain = if started_anvil {
471 let state_file = db_dir.join(profile.id()).join(ANVIL_STATE_FILENAME);
472 if state_file.exists() {
473 use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
474 let state_data = std::fs::read(&state_file)?;
475 let load_result = jsonrpc_client
476 .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
477 .await?;
478 tracing::info!("Loaded anvil state from {}: {}.", state_file.display(), load_result.0);
479 } else {
480 tracing::info!(
481 "No existing anvil state file found at {}, starting with fresh state.",
482 state_file.display()
483 );
484 }
485 Some(jsonrpc_client.clone())
486 } else {
487 None
488 };
489
490 (store, lyq_store, anvil_chain)
491 }
492 };
493 let dis = LyquidDiscovery {
498 pool: None,
499 register: Arc::new(Notify::new()),
500 bartender_id: *profile.bartender_id(),
501 bartender_addr: Address::from_str(profile.bartender_addr())
502 .context("Invalid bartender address in profile.")?
503 .into(),
504 }
505 .start();
506 sys.dis = Some(dis.clone());
507
508 let eth_backend = setup_eth_backend(&profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
509
510 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
511 let fco = FCO::new(
512 eth_backend.clone(),
513 Arc::new(fco_store),
514 Some(profile.init_chain_position()),
515 )?
516 .start();
517 sys.fco = Some(fco.clone());
518 sys.eth_backend = Some(eth_backend);
520 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
525 let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
526 sys.log = Some(log.clone());
527
528 let mut lvm = lyquid::LVM::new(
529 log.clone(),
530 network,
531 10,
532 Some(lyquor_vm::InterOps {
533 inter: fco.clone().recipient(),
534 submit: fco.clone().recipient(),
535 }),
536 );
537 let api_subs = api::Subscriptions::new().start();
538 let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
539 Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
540 1000,
541 ));
542 for peer in &config.peers {
546 lvm.upc_mut()
547 .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
548 .await
549 .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
550 }
551 let image_repo_store = RocksDB::new(&config.storage.image_repo_dir)?;
555 let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
556 let bartender_id = *profile.bartender_id();
557 let pool = lyquid::LyquidPool::new(
558 bartender_id,
559 lvm,
560 lyquid::LyquidPoolSetup {
561 fco: fco.clone(),
562 store_factory: lyq_store,
563 image_repo,
564 event_store,
565 api_subs: api_subs.clone(),
566 config: lyquid::LyquidConfig::builder().build(),
567 },
568 )
569 .await?
570 .start();
571 sys.pool = Some(pool.clone());
572 let bartender_setup = {
576 let register_topic = LyteLog::tagged_value_topic("Register");
577 match log
578 .send(lyquor_vm::log::GetNumberOfRecords {
579 id: bartender_id,
580 topic: register_topic,
581 })
582 .await
583 {
584 Ok(count) if count > 0 => None,
585 Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
586 id: bartender_id,
587 topic: register_topic,
588 })),
589 Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
590 }
591 };
592
593 let api = lyquor_lib::api::EthAPIServer::new(
595 &config.network.api_addr,
596 jsonrpc_client.clone(),
597 api_subs,
598 fco.clone(),
599 pool.clone(),
600 )
601 .await?;
602 sys.api = Some(api.start());
603
604 if let Some(reg) = bartender_setup {
606 tracing::info!("Waiting for bartender to be registered.");
607 reg.await.context("Failed to wait for bartender registration.")?;
608 }
609 let bartender_ins = pool
611 .send(lyquid::GetBartender)
612 .await
613 .expect("Failed to obtain bartender.");
614 let bartender_ln = bartender_ins
615 .instance()
616 .read()
617 .max_number()
618 .unwrap_or(LyquidNumber::ZERO);
619 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
620 bartender_ins
621 .wait_until(bartender_ln)
622 .await
623 .context("Failed to wait for the initial re-execution of bartender.")?;
624
625 log.send(lyquor_vm::log::Subscribe {
627 id: bartender_id,
628 topic: LyteLog::tagged_value_topic("Register"),
629 subscriber: dis.clone().recipient(),
630 })
631 .await
632 .context("Failed to subscribe to log system for LyquidDiscovery.")?;
633 dis.send(SetPool { pool: pool.clone() })
634 .await
635 .context("Failed to set pool in LyquidDiscovery.")?;
636
637 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
638
639 let db = DBFlusher {
641 last_bn: 0,
642 db_dir: config.resolved_db_dir(),
643 store,
644 fco,
645 anvil_chain,
646 flush_interval_secs: config.flush_interval_secs,
647 }
648 .start();
649 sys.db = Some(db);
650
651 Ok(sys)
652 }
653}
654
655fn generate_unused_port() -> anyhow::Result<u16> {
656 let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
657 let port = listener.local_addr().context("Failed to get local address.")?.port();
658 Ok(port)
659}
660
661fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
662 let anvil_port = generate_unused_port()?;
663 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
666 let mut sigset = SigSet::empty();
667 sigset.add(Signal::SIGINT);
668 let mut old_mask = SigSet::empty();
669 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
670 .context("Failed to mask SIGINT for anvil.")?;
671
672 let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
673
674 let stdout_reader = std::io::BufReader::new(
675 anvil
676 .child_mut()
677 .stdout
678 .take()
679 .context("Failed to read from anvil stdout.")?,
680 );
681
682 tokio::task::spawn_blocking(move || {
683 use std::io::BufRead;
684 let mut stdout_lines = stdout_reader.lines();
685 while let Some(line) = stdout_lines.next() {
686 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
687 }
688 });
689
690 tracing::info!("Anvil started at port {}.", anvil_port);
691 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
692 .context("Failed to restore old signal mask for anvil.")?;
693
694 Ok(anvil)
695}
696
697fn parse_override_value(raw: &str) -> Value {
698 if let Ok(v) = raw.parse::<bool>() {
699 return Value::from(v);
700 }
701 if let Ok(v) = raw.parse::<u64>() {
702 return Value::from(v);
703 }
704 if let Ok(v) = raw.parse::<i64>() {
705 return Value::from(v);
706 }
707 if let Ok(v) = raw.parse::<f64>() {
708 return Value::from(v);
709 }
710 Value::from(raw.to_string())
711}
712
713fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
714 let mut segments = key.split('.').peekable();
715 let mut current = overrides;
716
717 while let Some(segment) = segments.next() {
718 if segments.peek().is_none() {
719 current.insert(segment.to_string(), value);
720 return Ok(());
721 }
722
723 let entry = current
724 .entry(segment.to_string())
725 .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
726
727 match entry {
728 Value::Object(dict) => {
729 current = dict;
730 }
731 _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
732 }
733 }
734
735 Ok(())
736}
737
738fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
739 let mut overrides: Map<String, Value> = Map::new();
740
741 if let Some(entries) = matches.get_many::<String>("config-override") {
742 for entry in entries {
743 let (key, value) = entry
744 .split_once('=')
745 .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
746 insert_override_path(&mut overrides, key, parse_override_value(value))?;
747 }
748 }
749
750 Ok(overrides)
751}
752
753#[actix::main]
754async fn main() -> anyhow::Result<()> {
755 std::panic::set_hook(Box::new(|_| {
757 if let Some(&pid) = ANVIL_PID.get() {
758 tracing::warn!("Panic detected, killing anvil process {}", pid);
759 use nix::sys::signal::{Signal, kill};
760 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
761 }
762 }));
763
764 lyquor_cli::setup_tracing()?;
765
766 println!("{}", lyquor_cli::format_logo_banner());
767
768 let matches = clap::command!()
769 .propagate_version(true)
770 .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
771 .arg(
772 clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
773 .required(false)
774 .action(clap::ArgAction::Append)
775 .value_parser(clap::builder::NonEmptyStringValueParser::new()),
776 )
777 .get_matches();
778
779 let config_path = matches.get_one::<String>("config").map(PathBuf::from);
780 let overrides = build_config_overrides(&matches)?;
781 let config = NodeConfig::load(config_path, overrides)?;
782
783 let node_seed = config.node_key.seed_bytes()?;
784 let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed);
785 let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
786 .listen_addr(config.network.upc_addr.clone())
787 .build()
788 .context("Failed to setup network.")?;
789 network.start().await.context("Failed to start network.")?;
790
791 let mut _anvil: Option<AnvilInstance> = None;
792 let profile: Arc<LyquorProfile> = Arc::new(match config.profile {
793 NetworkType::Devnet => {
794 let ws_url = match &config.force_seq_eth {
795 None => {
796 let anvil = start_devnet_anvil()?;
797 ANVIL_PID.set(anvil.child().id()).ok();
799 let ep = anvil.ws_endpoint();
800 _anvil = Some(anvil);
801 ep
802 }
803 Some(url) => url.to_string(),
804 };
805
806 Devnet::new(ws_url).into()
807 }
808 _ => return Err(anyhow::anyhow!("Network is not supported.")),
809 });
810 let seq_eth_url = config
811 .force_seq_eth
812 .clone()
813 .unwrap_or_else(|| profile.sequencer().to_string());
814
815 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
818 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
819
820 let node = Node {
821 sys: Subsystems::default(),
822 config: Arc::new(config),
823 profile: profile.clone(),
824 seq_eth_url,
825 started_anvil: _anvil.is_some(),
826 network: Some(network),
827 }
828 .start();
829
830 tokio::select! {
831 _ = sigint.recv() => {
832 tracing::info!("received SIGINT");
833 }
834 _ = sigterm.recv() => {
835 tracing::info!("received SIGTERM");
836 }
837 }
838
839 Ok(node.send(Stop).await?)
840}